[
https://issues.apache.org/jira/browse/SAMZA-1665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490386#comment-16490386
]
Yi Pan (Data Infrastructure) commented on SAMZA-1665:
-----------------------------------------------------
There is another sequence in this race condition that results in the waiting
forever in LocalApplicationRunner.waitForFinish():
LocalApplicationRunner main thread:
# stop() called to set the flag and call StreamProcessor.stop() asynchronously,
which only stops the container in the first invocation. stop JC is expected to
be called second time via ContainerListener after the container is properly
shutdown.
# waitForFinish() waiting for the shutdownLatch in LocalApplicationRunner,
which expects the JC stop is called and count down the latch.
SamzaContainer thread:
# start the proper shutting down sequence
Debounce timer thread:
# Detected number of processors change and triggered JobModel expiration
# Calls JobCoordinatorListener.onJobModelExpired(), which invokes
container.pause(), setting the pause flag and calling shutting down the
container while the SamzaContainer thread is already in the middle of shutdown
sequence
# wait on jcContainerShutdownLatch
SamzaContainer thread:
# continue with the shutdown sequence and complete
# Because container.pause flag is set by Debounce timer thread, invoking
containerListener.onContainerStop(pause==true), which assumes that the
container shutdown is by active JC in reaction to JobModel changes. Hence, it
only count down jcContainerShutdownLatch and returns after the shutdown
sequence, leaving the JC running.
Debounce timer thread:
# awake from waiting on jcContainerShutdownLatch.await() and continue as if the
job model is to be updated
# new container is restarted and JC continues to be alive, w/o count down the
LocalApplicationRunner.shutdownLatch (i.e. this latch is expected to be count
down by JobCoordinatorListner.onCoordinatorStop() ->
processorListener.onShutdown())
This results in the user code calling LocalApplicationRunner.waitForFinish() to
hang forever. Enabling kill() and waitForFinish() in
TestZkLocalApplicationRunner#shouldUpdateJobModelWhenNewProcessorJoiningGroupUsingAllSspToSingleTaskGrouperFactory
can easily re-produce this hanging issue.
> Fix race condition when stopping StreamProcessor.
> -------------------------------------------------
>
> Key: SAMZA-1665
> URL: https://issues.apache.org/jira/browse/SAMZA-1665
> Project: Samza
> Issue Type: Bug
> Reporter: Shanthoosh Venkataraman
> Assignee: Shanthoosh Venkataraman
> Priority: Major
>
> When the user thread stops the streamProcessor and onNewJobModelExpired event
> is executed from debounce thread at the same time, we observe the following
> issue:
> A. User thread stops the StreamProcessor which in turn stops the current
> running container.
> B. Before ZkJobCoordinator is shutdown, onNewJobModelExpired is executed from
> debounce thread(which spawns a new container).
> C. User thread stops the ZkJobCoordinator.
> After the StreamProcessor shutdown sequence returns, container thread is
> alive and zkJobCoordinator is dead. This results in a orphaned container when
> stopping stream processor. We were able reproduce this locally.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)