> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, > > lines 125-126 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line125> > > > > Don't we need to stop the container directly here? shutdown will stop > > the executor from accepting any new work, but will not stop running work. > > In any case, wouldn't a clean shutdown here be better (e.g. for flushing > > state) then trying to force shutdown via the executor?
This stop() method allows the user to directly stop the processor. So, it should not only stop the container, but also not accept more requests on the same executor instance. A stopped stream processor cannot be restarted unless the user creates another instance of the processor. Shutting the executor service should trigger the shutdown hook. The shutdown hook invokes the shutdown actions (flushing state, checkpoint etc) and guarantees a clean shutdown. Is there a better way of triggering the shutdown actions ? The alternative would be to not trigger the shutdown hook and directly call all steps for shutting down the container. Right now, stopping the container only "stops" the runloop from further submitting tasks. It doesn't clean up anything. > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, > > line 145 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line145> > > > > Do we need to ensure the previous container is stopped before starting > > the new container? For example, would it be possible for the new container > > and the old container to stomp on eachother's local state if they're > > running at the same time? container.stop appears to be asynchronous and > > doesn't appear to give you any guarantee about when the container is > > actually stopped. > > > > --- > > > > Is the JobModelUpdateHandler called from the same thread that > > StreamProcessor.start is? If not (and given this is a callback its not a > > good assumption) you should make container volatile. In the current world, restarting a container with a job model should not stomp another's local state as they are all isolated at task level. In the current use-case, I am not sure what the correct approach for handling the state is. We consume all ssps within the same task. So, the store, by default, will be shared. We haven't scoped out stateful processing in standalone world. JobModelUpdateHandler will be called from the JobCoordinator thread. In the current use-case, this method won't be invoked as the StandaloneJobCoordinator does not monitor for JobModel changes. Since StreamProcessor is meant to handle pluggable JobCoordinators, it makes sense to treat the updatehandler as being called from a different thread. > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java, > > line 44 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line44> > > > > If this is write-once I would move this to the constructor and make it > > final. Otherwise does this need to be volatile? Its hard to tell as it is > > not clear how it is used. It might be worth noting in the class docs that > > this class is not thread safe. Not sure if I can move it to the constructor. It is written by whichever thread is "registering" as JobModelUpdateHandler. Fyi, we are not using JobModelUpdateHandler in StandaloneJobCoordinator, even though it may be assigned. I can actually make "registerJobModelUpdateHandler" a no-op and totally get rid of this variable. > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java, > > lines 60-61 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line60> > > > > I would suggest using a block here versus a single statement. It is > > easy to break, e.g.: > > > > ``` > > if (systemFactoryClassName == null) > > log.error("error message"); > > throw new SamzaException("error message") > > ``` Made this change > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java, > > lines 92-94 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428182#file1428182line92> > > > > How is this used? It seems to be write only? Yes. It is write-only. I will make this a no-op! > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java, > > line 50 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428190#file1428190line50> > > > > To future proof this a bit you could use > > AllSspToSingleTaskGrouperFactory.class.getName. Same for the one below. Got it. > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, lines > > 107-109 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428186#file1428186line107> > > > > Should this ensure that stop is complete before returning? > > Alternatively if we want to allow stop to be async, should we provide a way > > to wait for it? Yeah. There is no guarantee here that stop will successfully stop the run loop. Most of the comments you have provide arise due to the lack of proper lifecycle methods for the SamzaContainer. I think we should define the container life-cycle handlers before fixing it here. - Navina ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48356/#review139626 ----------------------------------------------------------- On June 23, 2016, 1:14 a.m., Navina Ramesh wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48356/ > ----------------------------------------------------------- > > (Updated June 23, 2016, 1:14 a.m.) > > > Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Added ConfigBuilder and support classes > > Added JobCoordinator interfaces > > > Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer > interface > > > Added TestStreamProcessor and some unit tests for ConfigBuilders > > > Changing who defined processorId > > > Fixed checkstyle errors > > > Replaced SamzaException with ConfigException > > > Removing localityManager instantiation from Samza Container > > > Diffs > ----- > > build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af > checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 > > samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 08a4debb06f9925ae741049abb2ee0df97b2243b > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala > cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 18c09224bbae959342daf9b2b7a7d971cc224f48 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 0aee4ced23ba730cca628fd1a59831007d348f56 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 56881d46be9f859999adabbbda20433b208e012e > > samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java > PRE-CREATION > samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java > PRE-CREATION > > samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java > 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java > bc95f31c0dcaaa68d483a6f152b61aba6c543fff > > Diff: https://reviews.apache.org/r/48356/diff/ > > > Testing > ------- > > ./gradlew clean build > > Local integration test: > ./bin/grid start zookeeper > ./bin/grid start kafka > Then, run TestStreamProcessor.java > > > Thanks, > > Navina Ramesh > >