mynameborat commented on a change in pull request #1478:
URL: https://github.com/apache/samza/pull/1478#discussion_r602601084



##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -392,6 +395,78 @@ StartpointManager createStartpointManager() {
     return new StartpointManager(coordinatorStreamStore);
   }
 
+  /**
+   * Check if the new job model contains a different work assignment for the 
processor compared the last active job
+   * model. In case of different work assignment, expire the current job model 
by invoking the <i>onJobModelExpired</i>
+   * on the registered {@link JobCoordinatorListener}.
+   * At this phase, the job model is yet to be agreed by the quorum and hence, 
this optimization helps availability of
+   * the processors in the event no changes in the work assignment.
+   *
+   * @param newJobModel new job model published by the leader
+   */
+  @VisibleForTesting
+  void checkAndExpireJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null");
+    if (coordinatorListener == null) {
+      LOG.info("Skipping job model expiration since there are no active 
listeners");
+      return;
+    }
+
+    if (JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      LOG.info("Skipping job model expiration for processor {} due to no 
change in work assignment.", processorId);
+    } else {
+      coordinatorListener.onJobModelExpired();
+    }
+  }
+
+  /**
+   * Checks if the new job model contains a different work assignment for the 
processor compared to the last active
+   * job model. In case of different work assignment, update the task locality 
of the tasks associated with the
+   * processor and notify new job model to the registered {@link 
JobCoordinatorListener}.
+   *
+   * @param newJobModel new job model agreed by the quorum
+   */
+  @VisibleForTesting
+  void onNewJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null. Failing 
onNewJobModel");
+    // start the container with the new model
+    if (!JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      if (newJobModel.getContainers().containsKey(processorId)) {

Review comment:
       The implication of work assignment remaining the same can be categorized 
into 
   1. Processor part of the job model
   2. Processor not part of the job model.
   
   For both the state of the processor remains what it was when the rebalance 
started. e.g., 
   [1] should continue to process its work assignment without any interruption 
as part of the rebalance. i.e., there will be no expiration of the existing 
work (a.k.a samza container won't be stopped) and also no notification to 
`StreamProcessor` about the rebalance since work assignment didn't change.
   [2] should have no work and be idle processor and will continue to be idle. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to