mynameborat commented on a change in pull request #1213: [WIP] SAMZA-2305: 
Stream processor should ensure previous container is stopped during a rebalance
URL: https://github.com/apache/samza/pull/1213#discussion_r344404613
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
 ##########
 @@ -461,8 +483,25 @@ public void onJobModelExpired() {
             } else {
               LOGGER.info("Container: {} shutdown completed for stream 
processor: {}.", container, processorId);
             }
+          } else if (state == State.IN_REBALANCE) {
+            if (container != null) {
+              boolean hasContainerShutdown = interruptSamzaContainer();
 
 Review comment:
   I agree that, it shuts down the executor service and the call site can 
instantiate a new one potentially based on its result. It currently shuts it 
down or not based on `container != null` and maybe i will extract that out from 
the method and establish a clear contract on this method where it will always 
invoke shutdownNow. By that, it is clear on the call site that upon invocation 
of this method, if executor service needs to used a new one needs to be 
created. 
   
   I prefer to use immutable method parameters wherever possible since its 
easier to reason about and also not worry about the side effects. In this case, 
this method is instance method anyways and IMO, I don't quite understand the 
benefit of passing the containerExecutorService as a method parameter.
   
   As far as creating an executor service upon success, I am debating whether 
to do it here vs create a new executor service in `onNewJobModel`. wdyt?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to