shanthoosh commented on a change in pull request #1478:
URL: https://github.com/apache/samza/pull/1478#discussion_r606426938
##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
##########
@@ -119,6 +120,47 @@ public static JobModel readJobModel(String
jobModelVersion, MetadataStore metada
}
}
+ /**
+ * Compares the {@link ContainerModel} for a given <i>processorId</i> across
two {@link JobModel}.
+ * @param processorId processor id for which work assignments are compared
+ * @param first first job model
+ * @param second second job model
+ * @return true - if {@link ContainerModel} for the processor is same across
the {@link JobModel}
+ * false - otherwise
+ */
+ public static boolean compareContainerModelForProcessor(String processorId,
JobModel first, JobModel second) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(processorId),
"Processor id cannot be blank");
+ if (first == second) {
+ return true;
+ }
+
+ if (first == null || second == null) {
+ return false;
+ }
+
+ return compareContainerModel(first.getContainers().get(processorId),
second.getContainers().get(processorId));
+ }
+
+ /**
+ * Helper method to compare the two input {@link ContainerModel}s.
+ * @param first first container model
+ * @param second second container model
+ * @return true - if two input {@link ContainerModel} are equal
+ * false - otherwise
+ */
+ @VisibleForTesting
+ static boolean compareContainerModel(ContainerModel first, ContainerModel
second) {
Review comment:
Minor:
I may be wrong. But AFAIK, I think `Objects.equals` from java-native already
has all these NPE checks . If possible, then we can re-use them.
##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
##########
@@ -392,6 +395,90 @@ StartpointManager createStartpointManager() {
return new StartpointManager(coordinatorStreamStore);
}
+ /**
+ * Check if the new job model contains a different work assignment for the
processor compared the last active job
+ * model. In case of different work assignment, expire the current job model
by invoking the <i>onJobModelExpired</i>
+ * on the registered {@link JobCoordinatorListener}.
+ * At this phase, the job model is yet to be agreed by the quorum and hence,
this optimization helps availability of
+ * the processors in the event no changes in the work assignment.
+ *
+ * @param newJobModel new job model published by the leader
+ */
+ @VisibleForTesting
+ void checkAndExpireJobModel(JobModel newJobModel) {
+ Preconditions.checkNotNull(newJobModel, "JobModel cannot be null");
+ if (coordinatorListener == null) {
+ LOG.info("Skipping job model expiration since there are no active
listeners");
+ return;
+ }
+
+ if (JobModelUtil.compareContainerModelForProcessor(processorId,
activeJobModel, newJobModel)) {
+ LOG.info("Skipping job model expiration for processor {} due to no
change in work assignment.", processorId);
Review comment:
Minor:
Would be helpful to either log the jobModel or the work-assignment here for
debugging purposes. Essentially logging the before/after work-assignment on a
quorum change just for debugging purposes. May-be logging it when they're
different would be better.
##########
File path: samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java
##########
@@ -119,6 +120,47 @@ public static JobModel readJobModel(String
jobModelVersion, MetadataStore metada
}
}
+ /**
+ * Compares the {@link ContainerModel} for a given <i>processorId</i> across
two {@link JobModel}.
+ * @param processorId processor id for which work assignments are compared
+ * @param first first job model
+ * @param second second job model
+ * @return true - if {@link ContainerModel} for the processor is same across
the {@link JobModel}
+ * false - otherwise
+ */
+ public static boolean compareContainerModelForProcessor(String processorId,
JobModel first, JobModel second) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(processorId),
"Processor id cannot be blank");
+ if (first == second) {
+ return true;
+ }
+
+ if (first == null || second == null) {
+ return false;
+ }
+
+ return compareContainerModel(first.getContainers().get(processorId),
second.getContainers().get(processorId));
Review comment:
Do you think it will be useful to add NPE guards against the result of
the object obtained by `jobModel.getContainers()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]