[ 
https://issues.apache.org/jira/browse/SAMZA-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shanthoosh Venkataraman closed SAMZA-1730.
------------------------------------------

> Add state validations in StreamProcesssor.
> ------------------------------------------
>
>                 Key: SAMZA-1730
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1730
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> Existing StreamProcessor implementation doesn't have state variable to 
> represent it's current state(if it's in rebalance/shutdown/running state). 
> Absence of this information leads to following two problems.
>  - When execution of StreamProcessor.stop() and 
> JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer 
> might be still running after StreamProcessor had been stopped(due to 
> interleaved execution order). Here's a sample execution order:
>                  1. User thread invokes `SamzaContaienr.stop()`. 
>                   2. `StreamProcessor.stop()` stops the current running samza 
> container.
>                   3. Before StreamProcessor stops the ZkJobCoordinator, 
> ZkJobCoordinator initializes and executes a new SamzaContainer(Due to change 
> in global processors group).
>                   4. StreamProcessor stops the ZkJobCoordinator.
>  - When execution of StreamProcessor.stop() and 
> JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor 
> will not be stopped cleanly.
>                 `paused` is state variable held in SamzaContainer to indicate 
> if it has been stopped for new JobModel for JobCoordinator(By default 
> `paused` is set to `false` in `SamzaContainer`). Here's a sample execution 
> order:
>                 1. User thread invokes `SamzaContainer.stop()` and triggers 
> `SamzaContainer.shutdown`. At the point, user thread is waiting for 
> `onContainerStop(paused=false)`(container stopped callback with paused = 
> false).
>                  2. Before the SamzaContainer is shutdown, debounce thread 
> invokes `onJobModelExpired` and triggers `SamzaContainer.pause()`. 
> `SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
>                  3 . SamzaContainer shuts down and triggers the container 
> shutdown callback: onContainerStop(with paused = true). When paused is set to 
> true in onContainerStop callback, StreamProcessor shutdown sequence is not 
> triggered. 
>                  4. StreamProcessor would participate in the processor group 
> coordination activities as if shutdown was not triggered.
>                  5. LocalApplicationRunner.waitForFinish will block 
> indefinitely.
> To solve the above problems, following changes were done:
>  * Add state field to `StreamProcessor` to represent it's current state. 
> Before performing any StreamProcessor operation, it's current state is 
> checked and only if it's valid for the operation then the operation is 
> performed.
>  * Remove the paused state from SamzaContainer and it is covered by the 
> StreamProcessor state itself.
>  * Make onJobModelExpired as start state for any custom JobCoordinators. 
>  * Interrupt the container thread on StreamProcessor.stop() if it's running 
> after shutdown timeout.
>  * Add tests to verify the behavior in StreamProcessor/LocalApplicationRunner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to