cameronlee314 commented on a change in pull request #1519:
URL: https://github.com/apache/samza/pull/1519#discussion_r695191504



##########
File path: 
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -134,29 +135,34 @@ public JobCoordinatorMetadata 
generateJobCoordinatorMetadata(JobModel jobModel,
    *
    * @return true if metadata changed, false otherwise
    */
-  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
-    boolean changed = true;
-
-    if (previousMetadata == null) {
-      metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+  public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata 
newMetadata,
+      JobCoordinatorMetadata previousMetadata) {
+    Set<JobMetadataChange> changes = new HashSet<>();
+    boolean newDeployment = false;
+    if (previousMetadata == null || 
!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
       metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
-      metrics.setJobModelChangedAcrossApplicationAttempt(1);
-    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
-      metrics.setConfigChangedAcrossApplicationAttempt(1);
-    } else {
-      changed = false;
-      metrics.incrementApplicationAttemptCount();
+      newDeployment = true;
+      changes.add(JobMetadataChange.NEW_DEPLOYMENT);
     }
-
-    if (changed) {
-      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
-    } else {
+    if (previousMetadata == null || 
!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      if (!newDeployment) {
+        metrics.setJobModelChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.JOB_MODEL);
+    }
+    if (previousMetadata == null || 
!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      if (!newDeployment) {
+        metrics.setConfigChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.CONFIG);
+    }
+    if (changes.isEmpty()) {
       LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+      metrics.incrementApplicationAttemptCount();
+    } else {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);

Review comment:
       Oh ok, if you are fine changing the metrics, then we can do that. Seems 
a little out-of-scope of the intention of this PR, but I can change the metrics 
to just emit "job model changed" and "config changed" instead of "job model 
changed across attempts" and "config changed across attempts". Is that what you 
were thinking?
   If we do this, I still prefer to not short-circuit on `previousMetadata == 
null`, because then we would need to have multiple places where each metric and 
each `JobMetadataChange` gets added to the results list.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to