shanthoosh commented on a change in pull request #1478:
URL: https://github.com/apache/samza/pull/1478#discussion_r606426938



##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
##########
@@ -119,6 +120,47 @@ public static JobModel readJobModel(String 
jobModelVersion, MetadataStore metada
     }
   }
 
+  /**
+   * Compares the {@link ContainerModel} for a given <i>processorId</i> across 
two {@link JobModel}.
+   * @param processorId processor id for which work assignments are compared
+   * @param first first job model
+   * @param second second job model
+   * @return true - if {@link ContainerModel} for the processor is same across 
the {@link JobModel}
+   *         false - otherwise
+   */
+  public static boolean compareContainerModelForProcessor(String processorId, 
JobModel first, JobModel second) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), 
"Processor id cannot be blank");
+    if (first == second) {
+      return true;
+    }
+
+    if (first == null || second == null) {
+      return false;
+    }
+
+    return compareContainerModel(first.getContainers().get(processorId), 
second.getContainers().get(processorId));
+  }
+
+  /**
+   * Helper method to compare the two input {@link ContainerModel}s.
+   * @param first first container model
+   * @param second second container model
+   * @return true - if two input {@link ContainerModel} are equal
+   *         false - otherwise
+   */
+  @VisibleForTesting
+  static boolean compareContainerModel(ContainerModel first, ContainerModel 
second) {

Review comment:
       Minor:
   I may be wrong. But AFAIK, I think `Objects.equals` from java-native already 
has all these NPE checks . If possible, then we can re-use them.

##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -392,6 +395,90 @@ StartpointManager createStartpointManager() {
     return new StartpointManager(coordinatorStreamStore);
   }
 
+  /**
+   * Check if the new job model contains a different work assignment for the 
processor compared the last active job
+   * model. In case of different work assignment, expire the current job model 
by invoking the <i>onJobModelExpired</i>
+   * on the registered {@link JobCoordinatorListener}.
+   * At this phase, the job model is yet to be agreed by the quorum and hence, 
this optimization helps availability of
+   * the processors in the event no changes in the work assignment.
+   *
+   * @param newJobModel new job model published by the leader
+   */
+  @VisibleForTesting
+  void checkAndExpireJobModel(JobModel newJobModel) {
+    Preconditions.checkNotNull(newJobModel, "JobModel cannot be null");
+    if (coordinatorListener == null) {
+      LOG.info("Skipping job model expiration since there are no active 
listeners");
+      return;
+    }
+
+    if (JobModelUtil.compareContainerModelForProcessor(processorId, 
activeJobModel, newJobModel)) {
+      LOG.info("Skipping job model expiration for processor {} due to no 
change in work assignment.", processorId);

Review comment:
       Minor:
   Would be helpful to either log the jobModel or the work-assignment here for 
debugging purposes. Essentially logging the before/after work-assignment on a 
quorum change just for debugging purposes. May-be logging it when they're 
different would be better.

##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
##########
@@ -119,6 +120,47 @@ public static JobModel readJobModel(String 
jobModelVersion, MetadataStore metada
     }
   }
 
+  /**
+   * Compares the {@link ContainerModel} for a given <i>processorId</i> across 
two {@link JobModel}.
+   * @param processorId processor id for which work assignments are compared
+   * @param first first job model
+   * @param second second job model
+   * @return true - if {@link ContainerModel} for the processor is same across 
the {@link JobModel}
+   *         false - otherwise
+   */
+  public static boolean compareContainerModelForProcessor(String processorId, 
JobModel first, JobModel second) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(processorId), 
"Processor id cannot be blank");
+    if (first == second) {
+      return true;
+    }
+
+    if (first == null || second == null) {
+      return false;
+    }
+
+    return compareContainerModel(first.getContainers().get(processorId), 
second.getContainers().get(processorId));

Review comment:
       Do you think it will be useful to add NPE guards against the result of 
the object obtained by `jobModel.getContainers()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to