> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > lines 125-126
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line125>
> >
> >     Don't we need to stop the container directly here? shutdown will stop 
> > the executor from accepting any new work, but will not stop running work. 
> > In any case, wouldn't a clean shutdown here be better (e.g. for flushing 
> > state) then trying to force shutdown via the executor?

This stop() method allows the user to directly stop the processor. So, it 
should not only stop the container, but also not accept more requests on the 
same executor instance. A stopped stream processor cannot be restarted unless 
the user creates another instance of the processor. 

Shutting the executor service should trigger the shutdown hook. The shutdown 
hook invokes the shutdown actions (flushing state, checkpoint etc) and 
guarantees a clean shutdown. Is there a better way of triggering the shutdown 
actions ? 

The alternative would be to not trigger the shutdown hook and directly call all 
steps for shutting down the container. Right now, stopping the container only 
"stops" the runloop from further submitting tasks. It doesn't clean up anything.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 145
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line145>
> >
> >     Do we need to ensure the previous container is stopped before starting 
> > the new container? For example, would it be possible for the new container 
> > and the old container to stomp on eachother's local state if they're 
> > running at the same time? container.stop appears to be asynchronous and 
> > doesn't appear to give you any guarantee about when the container is 
> > actually stopped.
> >     
> >     ---
> >     
> >     Is the JobModelUpdateHandler called from the same thread that 
> > StreamProcessor.start is? If not (and given this is a callback its not a 
> > good assumption) you should make container volatile.

In the current world, restarting a container with a job model should not stomp 
another's local state as they are all isolated at task level. In the current 
use-case, I am not sure what the correct approach for handling the state is. We 
consume all ssps within the same task. So, the store, by default, will be 
shared. We haven't scoped out stateful processing in standalone world.

JobModelUpdateHandler will be called from the JobCoordinator thread. In the 
current use-case, this method won't be invoked as the StandaloneJobCoordinator 
does not monitor for JobModel changes. Since StreamProcessor is meant to handle 
pluggable JobCoordinators, it makes sense to treat the updatehandler as being 
called from a different thread.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java,
> >  line 44
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line44>
> >
> >     If this is write-once I would move this to the constructor and make it 
> > final. Otherwise does this need to be volatile? Its hard to tell as it is 
> > not clear how it is used. It might be worth noting in the class docs that 
> > this class is not thread safe.

Not sure if I can move it to the constructor. It is written by whichever thread 
is "registering" as JobModelUpdateHandler. Fyi, we are not using 
JobModelUpdateHandler in StandaloneJobCoordinator, even though it may be 
assigned. I can actually make "registerJobModelUpdateHandler"  a no-op and 
totally get rid of this variable.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java,
> >  lines 60-61
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line60>
> >
> >     I would suggest using a block here versus a single statement. It is 
> > easy to break, e.g.:
> >     
> >     ```
> >         if (systemFactoryClassName == null)
> >             log.error("error message");
> >             throw new SamzaException("error message")
> >     ```

Made this change


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java,
> >  lines 92-94
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line92>
> >
> >     How is this used? It seems to be write only?

Yes. It is write-only. I will make this a no-op!


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java,
> >  line 50
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428190#file1428190line50>
> >
> >     To future proof this a bit you could use 
> > AllSspToSingleTaskGrouperFactory.class.getName. Same for the one below.

Got it.


> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, lines 
> > 107-109
> > <https://reviews.apache.org/r/48356/diff/3/?file=1428186#file1428186line107>
> >
> >     Should this ensure that stop is complete before returning? 
> > Alternatively if we want to allow stop to be async, should we provide a way 
> > to wait for it?

Yeah. There is no guarantee here that stop will successfully stop the run loop. 
Most of the comments you have provide arise due to the lack of proper lifecycle 
methods for the SamzaContainer. I think we should define the container 
life-cycle handlers before fixing it here.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review139626
-----------------------------------------------------------


On June 23, 2016, 1:14 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated June 23, 2016, 1:14 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 0aee4ced23ba730cca628fd1a59831007d348f56 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f859999adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java 
> PRE-CREATION 
>   
> samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to