sborya commented on a change in pull request #1478:
URL: https://github.com/apache/samza/pull/1478#discussion_r602592234



##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -392,6 +395,78 @@ StartpointManager createStartpointManager() {
     return new StartpointManager(coordinatorStreamStore);
   }
 
+  /**
+   * Check if the new job model contains a different work assignment for the 
processor compared the last active job
+   * model. In case of different work assignment, expire the current job model 
by invoking the <i>onJobModelExpired</i>
+   * on the registered {@link JobCoordinatorListener}.
+   * At this phase, the job model is yet to be agreed by the quorum and hence, 
this optimization helps availability of
+   * the processors in the event no changes in the work assignment.
+   *
+   * @param newJobModel new job model published by the leader
+   */
+  @VisibleForTesting
+  void checkAndExpireJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null");
+    if (coordinatorListener == null) {
+      LOG.info("Skipping job model expiration since there are no active 
listeners");
+      return;
+    }
+
+    if (JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      LOG.info("Skipping job model expiration for processor {} due to no 
change in work assignment.", processorId);
+    } else {
+      coordinatorListener.onJobModelExpired();
+    }
+  }
+
+  /**
+   * Checks if the new job model contains a different work assignment for the 
processor compared to the last active
+   * job model. In case of different work assignment, update the task locality 
of the tasks associated with the
+   * processor and notify new job model to the registered {@link 
JobCoordinatorListener}.
+   *
+   * @param newJobModel new job model agreed by the quorum
+   */
+  @VisibleForTesting
+  void onNewJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null. Failing 
onNewJobModel");
+    // start the container with the new model
+    if (!JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      if (newJobModel.getContainers().containsKey(processorId)) {

Review comment:
       what happens in the else{ } case?

##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -392,6 +395,78 @@ StartpointManager createStartpointManager() {
     return new StartpointManager(coordinatorStreamStore);
   }
 
+  /**
+   * Check if the new job model contains a different work assignment for the 
processor compared the last active job
+   * model. In case of different work assignment, expire the current job model 
by invoking the <i>onJobModelExpired</i>
+   * on the registered {@link JobCoordinatorListener}.
+   * At this phase, the job model is yet to be agreed by the quorum and hence, 
this optimization helps availability of
+   * the processors in the event no changes in the work assignment.
+   *
+   * @param newJobModel new job model published by the leader
+   */
+  @VisibleForTesting
+  void checkAndExpireJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null");
+    if (coordinatorListener == null) {
+      LOG.info("Skipping job model expiration since there are no active 
listeners");
+      return;
+    }
+
+    if (JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      LOG.info("Skipping job model expiration for processor {} due to no 
change in work assignment.", processorId);
+    } else {
+      coordinatorListener.onJobModelExpired();

Review comment:
       add a LOG line here too.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to