> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > lines 125-126
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line125>
> >
> >     Don't we need to stop the container directly here? shutdown will stop 
> > the executor from accepting any new work, but will not stop running work. 
> > In any case, wouldn't a clean shutdown here be better (e.g. for flushing 
> > state) then trying to force shutdown via the executor?
> 
> Navina Ramesh wrote:
>     This stop() method allows the user to directly stop the processor. So, it 
> should not only stop the container, but also not accept more requests on the 
> same executor instance. A stopped stream processor cannot be restarted unless 
> the user creates another instance of the processor. 
>     
>     Shutting the executor service should trigger the shutdown hook. The 
> shutdown hook invokes the shutdown actions (flushing state, checkpoint etc) 
> and guarantees a clean shutdown. Is there a better way of triggering the 
> shutdown actions ? 
>     
>     The alternative would be to not trigger the shutdown hook and directly 
> call all steps for shutting down the container. Right now, stopping the 
> container only "stops" the runloop from further submitting tasks. It doesn't 
> clean up anything.
> 
> Navina Ramesh wrote:
>     Right now, stopping the container only "stops" the runloop from further 
> submitting tasks. It doesn't clean up anything.
>     > I take back what I said! Exiting the run loop automatically triggers 
> the shutdown sequence.

W.r.t. to how to shutdown, I would shutdown the container and join on it. I 
would do this before the jobCoordinator as it (jobCoordinator) must already be 
tolerant of container stops.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 145
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line145>
> >
> >     Do we need to ensure the previous container is stopped before starting 
> > the new container? For example, would it be possible for the new container 
> > and the old container to stomp on eachother's local state if they're 
> > running at the same time? container.stop appears to be asynchronous and 
> > doesn't appear to give you any guarantee about when the container is 
> > actually stopped.
> >     
> >     ---
> >     
> >     Is the JobModelUpdateHandler called from the same thread that 
> > StreamProcessor.start is? If not (and given this is a callback its not a 
> > good assumption) you should make container volatile.
> 
> Navina Ramesh wrote:
>     In the current world, restarting a container with a job model should not 
> stomp another's local state as they are all isolated at task level. In the 
> current use-case, I am not sure what the correct approach for handling the 
> state is. We consume all ssps within the same task. So, the store, by 
> default, will be shared. We haven't scoped out stateful processing in 
> standalone world.
>     
>     JobModelUpdateHandler will be called from the JobCoordinator thread. In 
> the current use-case, this method won't be invoked as the 
> StandaloneJobCoordinator does not monitor for JobModel changes. Since 
> StreamProcessor is meant to handle pluggable JobCoordinators, it makes sense 
> to treat the updatehandler as being called from a different thread.

I guess what I was getting at is: would it be possible that you get a new job 
model for the same task? In that case you stop the task but don't wait for it 
to stop before starting up the new instance.

It seems safer to me to join on the container stop before starting a new 
container.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java,
> >  line 44
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line44>
> >
> >     If this is write-once I would move this to the constructor and make it 
> > final. Otherwise does this need to be volatile? Its hard to tell as it is 
> > not clear how it is used. It might be worth noting in the class docs that 
> > this class is not thread safe.
> 
> Navina Ramesh wrote:
>     Not sure if I can move it to the constructor. It is written by whichever 
> thread is "registering" as JobModelUpdateHandler. Fyi, we are not using 
> JobModelUpdateHandler in StandaloneJobCoordinator, even though it may be 
> assigned. I can actually make "registerJobModelUpdateHandler"  a no-op and 
> totally get rid of this variable.

Looks like you made setJobModelUpdateHandler a no-op per your comment below, so 
we just remove this.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java,
> >  lines 92-94
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line92>
> >
> >     How is this used? It seems to be write only?
> 
> Navina Ramesh wrote:
>     Yes. It is write-only. I will make this a no-op!

Best would be to remove this method altogether from the interface as it does 
not apply to all concrete classes. If it needs to be baked in for some reason 
then I would suggest throwing an UnsupportedOperationException. Otherwise this 
may cause confusion when you don't get updates. The former approach is much to 
be preferred, though.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review139626
-----------------------------------------------------------


On June 28, 2016, 8:20 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated June 28, 2016, 8:20 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f859999adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java 
> PRE-CREATION 
>   
> samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to