[ 
https://issues.apache.org/jira/browse/SAMZA-1665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490386#comment-16490386
 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-1665:
-----------------------------------------------------

There is another sequence in this race condition that results in the waiting 
forever in LocalApplicationRunner.waitForFinish():
LocalApplicationRunner main thread: 
# stop() called to set the flag and call StreamProcessor.stop() asynchronously, 
which only stops the container in the first invocation. stop JC is expected to 
be called second time via ContainerListener after the container is properly 
shutdown.
# waitForFinish() waiting for the shutdownLatch in LocalApplicationRunner, 
which expects the JC stop is called and count down the latch.

SamzaContainer thread:
# start the proper shutting down sequence

Debounce timer thread:
# Detected number of processors change and triggered JobModel expiration
# Calls JobCoordinatorListener.onJobModelExpired(), which invokes 
container.pause(), setting the pause flag and calling shutting down the 
container while the SamzaContainer thread is already in the middle of shutdown 
sequence
# wait on jcContainerShutdownLatch

SamzaContainer thread:
# continue with the shutdown sequence and complete
# Because container.pause flag is set by Debounce timer thread, invoking 
containerListener.onContainerStop(pause==true), which assumes that the 
container shutdown is by active JC in reaction to JobModel changes. Hence, it 
only count down jcContainerShutdownLatch and returns after the shutdown 
sequence, leaving the JC running.

Debounce timer thread:
# awake from waiting on jcContainerShutdownLatch.await() and continue as if the 
job model is to be updated
# new container is restarted and JC continues to be alive, w/o count down the 
LocalApplicationRunner.shutdownLatch (i.e. this latch is expected to be count 
down by JobCoordinatorListner.onCoordinatorStop() -> 
processorListener.onShutdown())

This results in the user code calling LocalApplicationRunner.waitForFinish() to 
hang forever. Enabling kill() and waitForFinish() in 
TestZkLocalApplicationRunner#shouldUpdateJobModelWhenNewProcessorJoiningGroupUsingAllSspToSingleTaskGrouperFactory
 can easily re-produce this hanging issue.

> Fix race condition when stopping StreamProcessor.
> -------------------------------------------------
>
>                 Key: SAMZA-1665
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1665
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> When the user thread stops the streamProcessor and onNewJobModelExpired event 
> is executed from debounce thread at the same time, we observe the following 
> issue:
> A. User thread stops the StreamProcessor which in turn stops the current 
> running container.
> B. Before ZkJobCoordinator is shutdown, onNewJobModelExpired is executed from 
> debounce thread(which spawns a new container). 
> C. User thread stops the ZkJobCoordinator. 
> After the StreamProcessor shutdown sequence returns,  container thread is 
> alive and zkJobCoordinator is dead. This results in a orphaned container when 
> stopping stream processor.  We were able reproduce this locally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to