> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > lines 125-126
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line125>
> >
> >     Don't we need to stop the container directly here? shutdown will stop 
> > the executor from accepting any new work, but will not stop running work. 
> > In any case, wouldn't a clean shutdown here be better (e.g. for flushing 
> > state) then trying to force shutdown via the executor?
> 
> Navina Ramesh wrote:
>     This stop() method allows the user to directly stop the processor. So, it 
> should not only stop the container, but also not accept more requests on the 
> same executor instance. A stopped stream processor cannot be restarted unless 
> the user creates another instance of the processor. 
>     
>     Shutting the executor service should trigger the shutdown hook. The 
> shutdown hook invokes the shutdown actions (flushing state, checkpoint etc) 
> and guarantees a clean shutdown. Is there a better way of triggering the 
> shutdown actions ? 
>     
>     The alternative would be to not trigger the shutdown hook and directly 
> call all steps for shutting down the container. Right now, stopping the 
> container only "stops" the runloop from further submitting tasks. It doesn't 
> clean up anything.
> 
> Navina Ramesh wrote:
>     Right now, stopping the container only "stops" the runloop from further 
> submitting tasks. It doesn't clean up anything.
>     > I take back what I said! Exiting the run loop automatically triggers 
> the shutdown sequence.
> 
> Chris Pettitt wrote:
>     W.r.t. to how to shutdown, I would shutdown the container and join on it. 
> I would do this before the jobCoordinator as it (jobCoordinator) must already 
> be tolerant of container stops.

I am waiting on the container to shutdown fully before stopping the 
jobCoordinator and executor. I hope this is what you meant.

Thinking a bit more on this, I feel that there is no strong need for the user 
to provide an ExecutorService. It doesn't seem to add a lot of value when the 
user cannot control the lifecycle of the executor itself. The same executor may 
be used to manage JobCoordinator thread in the future as well. These are 
internal to Samza and shouldn't require any user intervention. Do you still 
think there is value in keeping the executor Service as an argument to the 
StreamProcessor constructor?


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 145
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line145>
> >
> >     Do we need to ensure the previous container is stopped before starting 
> > the new container? For example, would it be possible for the new container 
> > and the old container to stomp on eachother's local state if they're 
> > running at the same time? container.stop appears to be asynchronous and 
> > doesn't appear to give you any guarantee about when the container is 
> > actually stopped.
> >     
> >     ---
> >     
> >     Is the JobModelUpdateHandler called from the same thread that 
> > StreamProcessor.start is? If not (and given this is a callback its not a 
> > good assumption) you should make container volatile.
> 
> Navina Ramesh wrote:
>     In the current world, restarting a container with a job model should not 
> stomp another's local state as they are all isolated at task level. In the 
> current use-case, I am not sure what the correct approach for handling the 
> state is. We consume all ssps within the same task. So, the store, by 
> default, will be shared. We haven't scoped out stateful processing in 
> standalone world.
>     
>     JobModelUpdateHandler will be called from the JobCoordinator thread. In 
> the current use-case, this method won't be invoked as the 
> StandaloneJobCoordinator does not monitor for JobModel changes. Since 
> StreamProcessor is meant to handle pluggable JobCoordinators, it makes sense 
> to treat the updatehandler as being called from a different thread.
> 
> Chris Pettitt wrote:
>     I guess what I was getting at is: would it be possible that you get a new 
> job model for the same task? In that case you stop the task but don't wait 
> for it to stop before starting up the new instance.
>     
>     It seems safer to me to join on the container stop before starting a new 
> container.

I think the stopContainer addresses this requirement. Let me know if it doesn't


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review139626
-----------------------------------------------------------


On July 11, 2016, 10:47 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 11, 2016, 10:47 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f859999adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java 
> PRE-CREATION 
>   
> samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to