Shanthoosh Venkataraman created SAMZA-1730:
----------------------------------------------

             Summary: Add state validations in StreamProcesssor.
                 Key: SAMZA-1730
                 URL: https://issues.apache.org/jira/browse/SAMZA-1730
             Project: Samza
          Issue Type: Bug
            Reporter: Shanthoosh Venkataraman
            Assignee: Shanthoosh Venkataraman


Existing StreamProcessor implementation doesn't have state variable to 
represent it's current state(if it's in rebalance/shutdown/running phase etc). 
Absence of this information leads to following two problems.

- When execution of StreamProcessor.stop() & 
JobCoordinatorListener.onNewJobModel() occur concurrently, SamzaContainer might 
be still running after StreamProcessor had been stopped(due to interleaved 
execution order). Here's sample execution order:

1. User thread invokes `SamzaContaienr.stop()`. 
2. `StreamProcessor.stop()` stops the current running samza container.
3. Before StreamProcessor stops the ZkJobCoordinator, ZkJobCoordinator 
initializes and executes a new SamzaContainer(Due to change in global 
processors group).
4. StreamProcessor stops the ZkJobCoordinator.

- When StreamProcessor.stop() & JobCoordinatorListener.jobModelExpired() 
execute concurrently, StreamProcessor stop() will be blocked indefinitely.

`paused` is state variable held in SamzaContainer to indicate if it has been 
paused(By default `paused` is set to `false` in `SamzaContainer`).

1. User thread invokes `SamzaContainer.stop()` and triggers 
`SamzaContainer.shutdown`. At the point, user thread is waiting for 
`onContainerStop(paused=false)`(container stopped callback with paused = false).
2. Before the SamzaContainer is shutdown, debounce thread invokes 
`onJobModelExpired` and triggers `SamzaContainer.pause()`. 
`SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
3. SamzaContainer shuts down and triggers the container shutdown callback: 
onContainerStop(with paused = true). When paused is set to true in 
onContainerStop callback, StreamProcessor shutdown sequence is not triggered. 
4. StreamProcessor would participate in the processor group coordination 
activities (as if shutdown was not triggered).
5. LocalApplicationRunner.waitForFinish will block indefinitely.


To solve the above problems, following changes were done to StreamProcessor:
 * Add state field to `StreamProcessor` to represent it's current state. Before 
performing any StreamProcessor operation, it's current state is checked and 
only if it's valid for the operation the the operation is performed.
 * Remove the paused state from SamzaContainer and it is covered by the 
StreamProcessor state itself.
 * Make onJobModelExpired as start state for any custom coordinators. 
 * Interrupt the container thread on StreamProcessor.stop() if it's running 
after shutdown timeout.
 * Add tests to verify the behavior in StreamProcessor/LocalApplicationRunner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to