[
https://issues.apache.org/jira/browse/SAMZA-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shanthoosh Venkataraman closed SAMZA-1730.
------------------------------------------
> Add state validations in StreamProcesssor.
> ------------------------------------------
>
> Key: SAMZA-1730
> URL: https://issues.apache.org/jira/browse/SAMZA-1730
> Project: Samza
> Issue Type: Bug
> Reporter: Shanthoosh Venkataraman
> Assignee: Shanthoosh Venkataraman
> Priority: Major
>
> Existing StreamProcessor implementation doesn't have state variable to
> represent it's current state(if it's in rebalance/shutdown/running state).
> Absence of this information leads to following two problems.
> - When execution of StreamProcessor.stop() and
> JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer
> might be still running after StreamProcessor had been stopped(due to
> interleaved execution order). Here's a sample execution order:
> 1. User thread invokes `SamzaContaienr.stop()`.
> 2. `StreamProcessor.stop()` stops the current running samza
> container.
> 3. Before StreamProcessor stops the ZkJobCoordinator,
> ZkJobCoordinator initializes and executes a new SamzaContainer(Due to change
> in global processors group).
> 4. StreamProcessor stops the ZkJobCoordinator.
> - When execution of StreamProcessor.stop() and
> JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor
> will not be stopped cleanly.
> `paused` is state variable held in SamzaContainer to indicate
> if it has been stopped for new JobModel for JobCoordinator(By default
> `paused` is set to `false` in `SamzaContainer`). Here's a sample execution
> order:
> 1. User thread invokes `SamzaContainer.stop()` and triggers
> `SamzaContainer.shutdown`. At the point, user thread is waiting for
> `onContainerStop(paused=false)`(container stopped callback with paused =
> false).
> 2. Before the SamzaContainer is shutdown, debounce thread
> invokes `onJobModelExpired` and triggers `SamzaContainer.pause()`.
> `SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
> 3 . SamzaContainer shuts down and triggers the container
> shutdown callback: onContainerStop(with paused = true). When paused is set to
> true in onContainerStop callback, StreamProcessor shutdown sequence is not
> triggered.
> 4. StreamProcessor would participate in the processor group
> coordination activities as if shutdown was not triggered.
> 5. LocalApplicationRunner.waitForFinish will block
> indefinitely.
> To solve the above problems, following changes were done:
> * Add state field to `StreamProcessor` to represent it's current state.
> Before performing any StreamProcessor operation, it's current state is
> checked and only if it's valid for the operation then the operation is
> performed.
> * Remove the paused state from SamzaContainer and it is covered by the
> StreamProcessor state itself.
> * Make onJobModelExpired as start state for any custom JobCoordinators.
> * Interrupt the container thread on StreamProcessor.stop() if it's running
> after shutdown timeout.
> * Add tests to verify the behavior in StreamProcessor/LocalApplicationRunner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)