This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 120098d  SAMZA-2670: Update granularity of 
JobCoordinatorMetadataManager.checkForMetadataChanges and epoch id extraction 
(#1519)
120098d is described below

commit 120098d6c45ca023d2a0fb144b0e333ab4e8ffe8
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Aug 24 14:08:17 2021 -0700

    SAMZA-2670: Update granularity of 
JobCoordinatorMetadataManager.checkForMetadataChanges and epoch id extraction 
(#1519)
    
    Changes:
    1. Modify checkForMetadataChanges to return a set of changes instead of a 
boolean.
    2. Modify fetchEpochIdForJobCoordinator to read epoch id out of a generic 
environment variable instead of parsing it out of a YARN-specific variable.
    3. Move JobCoordinatorMetadataManager to a new package because 
org.apache.samza.coordinator is bloated.
    
    API changes: For non-YARN usage of JobCoordinatorMetadataManager, pass the 
epoch id through the SAMZA_EPOCH_ID environment variable. There are currently 
no non-YARN usage, but there will be one coming in the near future.
---
 .../versioned/container/metrics-table.html         |   2 +-
 .../org/apache/samza/job/JobMetadataChange.java    |  31 +++++
 .../clustermanager/ClusterBasedJobCoordinator.java |   4 +-
 .../samza/environment/EnvironmentVariables.java    |  39 ++++++
 .../metadata}/JobCoordinatorMetadataManager.java   |  83 ++++++-----
 .../TestClusterBasedJobCoordinator.java            |  10 +-
 .../TestJobCoordinatorMetadataManager.java         | 154 +++++++++++++++------
 7 files changed, 242 insertions(+), 81 deletions(-)

diff --git a/docs/learn/documentation/versioned/container/metrics-table.html 
b/docs/learn/documentation/versioned/container/metrics-table.html
index 3bab5be..b7cd752 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -984,7 +984,7 @@
         <td><a href="#average-time">Average time</a> taken for all the 
processors to get the latest version of the job model after single processor 
change (without the occurence of a barrier timeout)</td>
     </tr>
     <tr>
-        <th colspan="2" class="section" 
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
 style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following 
metrics are applicable when Application Master High Availability is 
enabled</span></th>
+        <th colspan="2" class="section" 
id="job-coordinator-metadata-manager-metrics">org.apache.samza.job.metadata.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
 style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following 
metrics are applicable when Application Master High Availability is 
enabled</span></th>
     </tr>
     <tr>
         <td>application-attempt-count</td>
diff --git 
a/samza-api/src/main/java/org/apache/samza/job/JobMetadataChange.java 
b/samza-api/src/main/java/org/apache/samza/job/JobMetadataChange.java
new file mode 100644
index 0000000..9048a2d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/JobMetadataChange.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+/**
+ * Provides granularity into changes in job metadata (e.g. new job model, new 
config).
+ */
+public enum JobMetadataChange {
+  // indicates a new deployment of a job
+  NEW_DEPLOYMENT,
+  // indicates a new job model compared to the previous job model
+  JOB_MODEL,
+  // indicates a new set of configs compared to the previous configs
+  CONFIG
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index e956413..b62678b 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -38,7 +38,7 @@ import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
-import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.PartitionChangeException;
@@ -369,7 +369,7 @@ public class ClusterBasedJobCoordinator {
 
     JobCoordinatorMetadata previousMetadata = 
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
     JobCoordinatorMetadata newMetadata = 
jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);
-    if (jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)) {
+    if (!jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata).isEmpty()) {
       jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
       metadataChangedAcrossAttempts = true;
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
 
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
new file mode 100644
index 0000000..91977a2
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.environment;
+
+/**
+ * Contains environment variables that are used by Samza components. This 
provides a common place to put these variables
+ * and provide documentation for them.
+ */
+public class EnvironmentVariables {
+  /**
+   * The properties of the epoch identifier are as follows
+   * 1. Unique across applications in the cluster
+   * 2. Remains unchanged within a single deployment lifecycle
+   * 3. Remains unchanged across application attempt within a single 
deployment lifecycle
+   * 4. Changes across deployment lifecycle
+   *
+   * If using a non-YARN environment, then this needs to be filled in so that 
the job coordinator can properly manage
+   * changes (e.g. job model changes) within and across deployments.
+   * See JobCoordinatorMetadataManager.fetchEpochIdForJobCoordinator for more 
details about the YARN alternative to this
+   * environment variable.
+   */
+  public static final String SAMZA_EPOCH_ID = "SAMZA_EPOCH_ID";
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
 
b/samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
similarity index 86%
rename from 
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
rename to 
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
index 7e40382..bc61a7d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.coordinator;
+package org.apache.samza.job.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
@@ -25,15 +25,19 @@ import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import org.apache.samza.SamzaException;
+import org.apache.samza.environment.EnvironmentVariables;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
 import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metrics.Gauge;
@@ -77,7 +81,7 @@ public class JobCoordinatorMetadataManager {
   }
 
   /**
-   * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+   * Generates {@link JobCoordinatorMetadata}.
    *
    * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. 
Refer to the javadocs for more
    * details on how it is generated and the properties of the identifier.
@@ -96,7 +100,6 @@ public class JobCoordinatorMetadataManager {
    * Both config and job model identifiers should a 32 bit integer.
    *
    * @param jobModel job model used for generating the metadata
-   * @param config config used for generating the metadata
    *
    * @return the metadata for the job coordinator
    */
@@ -134,29 +137,34 @@ public class JobCoordinatorMetadataManager {
    *
    * @return true if metadata changed, false otherwise
    */
-  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
-    boolean changed = true;
-
-    if (previousMetadata == null) {
-      metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+  public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata 
newMetadata,
+      JobCoordinatorMetadata previousMetadata) {
+    Set<JobMetadataChange> changes = new HashSet<>();
+    boolean newDeployment = false;
+    if (previousMetadata == null || 
!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
       metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
-      metrics.setJobModelChangedAcrossApplicationAttempt(1);
-    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
-      metrics.setConfigChangedAcrossApplicationAttempt(1);
-    } else {
-      changed = false;
-      metrics.incrementApplicationAttemptCount();
+      newDeployment = true;
+      changes.add(JobMetadataChange.NEW_DEPLOYMENT);
     }
-
-    if (changed) {
-      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
-    } else {
+    if (previousMetadata == null || 
!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      if (!newDeployment) {
+        metrics.setJobModelChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.JOB_MODEL);
+    }
+    if (previousMetadata == null || 
!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      if (!newDeployment) {
+        metrics.setConfigChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.CONFIG);
+    }
+    if (changes.isEmpty()) {
       LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+      metrics.incrementApplicationAttemptCount();
+    } else {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
     }
-
-    return changed;
+    return changes;
   }
 
   /**
@@ -207,29 +215,33 @@ public class JobCoordinatorMetadataManager {
   }
 
   /**
-   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
-   * way of generating this ID and we will need some contract between the 
underlying cluster manager and samza engine
-   * around what the epoch ID should be like and what is needed to generate is 
across different cluster offerings.
-   * Due to unknowns defined above, we leave it as is and keep it simple for 
now. It is favorable to keep it this way
-   * instead of introducing a loosely defined interface/API and marking it 
unstable.
-   *
    * The properties of the epoch identifier are as follows
    *  1. Unique across applications in the cluster
    *  2. Remains unchanged within a single deployment lifecycle
    *  3. Remains unchanged across application attempt within a single 
deployment lifecycle
    *  4. Changes across deployment lifecycle
    *
-   *  Note: The above properties is something we want keep intact when 
extracting this into a well defined interface
-   *  or contract for YARN AM HA to work.
-   *  The format and property used to generate ID is specific to YARN and the 
specific format of the container name
-   *  is a public contract by YARN which is likely to remain backward 
compatible.
+   * For YARN environment:
+   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
+   * way of generating this ID, since it is YARN-specific. This is left as a 
legacy flow for backwards compatibility, as
+   * the original implementation did not define a cluster-agnostic contract.
+   * The format and property used to generate ID is specific to YARN and the 
specific format of the container name
+   * is a public contract by YARN which is likely to remain backward 
compatible.
+   *
+   * For non-YARN environments:
+   * Extract this from the environment variable SAMZA_EPOCH_ID. This is a more 
generic way of obtaining an epoch id, but
+   * it does require the resource management layer to inject this value.
    *
    * @return an identifier associated with the job coordinator satisfying the 
above properties
    */
   @VisibleForTesting
   String fetchEpochIdForJobCoordinator() {
-    String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
-    return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+    if (ClusterType.YARN.equals(this.clusterType)) {
+      String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+      return containerIdParts[1] + CONTAINER_ID_DELIMITER + 
containerIdParts[2];
+    } else {
+      return getEnvProperty(EnvironmentVariables.SAMZA_EPOCH_ID);
+    }
   }
 
   @VisibleForTesting
@@ -267,7 +279,8 @@ public class JobCoordinatorMetadataManager {
    * Type of the cluster deployment associated with the {@link 
JobCoordinatorMetadataManager}
    */
   public enum ClusterType {
-    YARN
+    YARN,
+    NON_YARN
   }
 
   /**
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index e0b0739..6eeb119 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -20,6 +20,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,7 +33,8 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.JobMetadataChange;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
@@ -249,7 +251,8 @@ public class TestClusterBasedJobCoordinator {
 
     
when(jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
     when(jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(any(), 
any())).thenReturn(newMetadata);
-    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)).thenReturn(false);
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)).thenReturn(
+        ImmutableSet.of());
     
when(clusterBasedJobCoordinator.createJobCoordinatorMetadataManager()).thenReturn(jobCoordinatorMetadataManager);
 
     /*
@@ -264,7 +267,8 @@ public class TestClusterBasedJobCoordinator {
     /*
      * Verify if there are changes to metadata, we persist the new metadata & 
update the metadata changed flag
      */
-    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)).thenReturn(true);
+    when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata)).thenReturn(
+        ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT));
     
clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
     assertTrue("JC metadata changed should be true", 
clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
     verify(jobCoordinatorMetadataManager, 
times(1)).writeJobCoordinatorMetadata(newMetadata);
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
 
b/samza-core/src/test/java/org/apache/samza/job/metadata/TestJobCoordinatorMetadataManager.java
similarity index 59%
rename from 
samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
rename to 
samza-core/src/test/java/org/apache/samza/job/metadata/TestJobCoordinatorMetadataManager.java
index b1739d9..c4ad179 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/job/metadata/TestJobCoordinatorMetadataManager.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.coordinator;
+package org.apache.samza.job.metadata;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -32,6 +33,7 @@ import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStrea
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
 import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
@@ -41,11 +43,10 @@ import org.apache.samza.serializers.Serde;
 import org.junit.Before;
 import org.junit.Test;
 
-import static 
org.apache.samza.coordinator.JobCoordinatorMetadataManager.ClusterType;
-import static 
org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_DELIMITER;
-import static 
org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_PROPERTY;
+import static 
org.apache.samza.job.metadata.JobCoordinatorMetadataManager.CONTAINER_ID_DELIMITER;
+import static 
org.apache.samza.job.metadata.JobCoordinatorMetadataManager.CONTAINER_ID_PROPERTY;
+import static 
org.apache.samza.job.metadata.JobCoordinatorMetadataManager.ClusterType;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -108,46 +109,119 @@ public class TestJobCoordinatorMetadataManager {
   }
 
   @Test
-  public void testCheckForMetadataChanges() {
-    JobCoordinatorMetadata previousMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
-    JobCoordinatorMetadata newMetadataWithDifferentEpochId =
-        new JobCoordinatorMetadata(NEW_EPOCH_ID, OLD_CONFIG_ID, 
OLD_JOB_MODEL_ID);
-
-    JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics metrics 
=
-        jobCoordinatorMetadataManager.getMetrics();
+  public void testCheckForMetadataChangesNoPreviousMetadata() {
+    JobCoordinatorMetadata newMetadata = new 
JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, NEW_JOB_MODEL_ID);
+    Set<JobMetadataChange> metadataChanges =
+        
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, null);
+    assertEquals("Metadata check should indicate all changes",
+        ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, 
JobMetadataChange.JOB_MODEL, JobMetadataChange.CONFIG),
+        metadataChanges);
+    assertEquals("New deployment should be 1 since Epoch ID changed", 1,
+        
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+    assertEquals("Job model changed across application attempts should be 0", 
0,
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getJobModelChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
+    assertEquals("Config changed across application attempts should be 0", 0,
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getConfigChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
+  }
 
-    boolean metadataChanged =
-        
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, 
newMetadataWithDifferentEpochId);
-    assertTrue("Metadata check should return true", metadataChanged);
+  @Test
+  public void testCheckForMetadataChangesNewDeployment() {
+    JobCoordinatorMetadata previousMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    JobCoordinatorMetadata newMetadata = new 
JobCoordinatorMetadata(NEW_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    Set<JobMetadataChange> metadataChanges =
+        
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
+    assertEquals("Metadata check should indicate new deployment", 
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT),
+        metadataChanges);
     assertEquals("New deployment should be 1 since Epoch ID changed", 1,
-        metrics.getNewDeployment().getValue().intValue());
-
-    JobCoordinatorMetadata newMetadataWithDifferentConfigId =
-        new JobCoordinatorMetadata(OLD_EPOCH_ID, NEW_CONFIG_ID, 
OLD_JOB_MODEL_ID);
-    metadataChanged =
-        
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, 
newMetadataWithDifferentConfigId);
-    assertTrue("Metadata check should return true", metadataChanged);
-    assertEquals("Config across application attempts should be 1", 1,
-        
metrics.getConfigChangedAcrossApplicationAttempt().getValue().intValue());
-
-    JobCoordinatorMetadata newMetadataWithDifferentJobModelId =
-        new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, 
NEW_JOB_MODEL_ID);
-    metadataChanged =
-        
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, 
newMetadataWithDifferentJobModelId);
-    assertTrue("Metadata check should return true", metadataChanged);
+        
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+    assertEquals("Application attempt count should be 0", 0,
+        
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
+  }
+
+  @Test
+  public void testCheckForMetadataChangesJobModelChange() {
+    JobCoordinatorMetadata previousMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    JobCoordinatorMetadata newMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, NEW_JOB_MODEL_ID);
+    Set<JobMetadataChange> metadataChanges =
+        
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
+    assertEquals("Metadata check should indicate new job model", 
ImmutableSet.of(JobMetadataChange.JOB_MODEL),
+        metadataChanges);
     assertEquals("Job model changed across application attempts should be 1", 
1,
-        
metrics.getJobModelChangedAcrossApplicationAttempt().getValue().intValue());
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getJobModelChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
+    assertEquals("Application attempt count should be 0", 0,
+        
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
+  }
 
-    JobCoordinatorMetadata newMetadataWithNoChange =
-        new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, 
OLD_JOB_MODEL_ID);
+  @Test
+  public void testCheckForMetadataChangesConfigChange() {
+    JobCoordinatorMetadata previousMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    JobCoordinatorMetadata newMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, NEW_CONFIG_ID, OLD_JOB_MODEL_ID);
+    Set<JobMetadataChange> metadataChanges =
+        
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
+    assertEquals("Metadata check should indicate new config", 
ImmutableSet.of(JobMetadataChange.CONFIG),
+        metadataChanges);
+    assertEquals("Config changed across application attempts should be 1", 1,
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getConfigChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
     assertEquals("Application attempt count should be 0", 0,
-        metrics.getApplicationAttemptCount().getValue().intValue());
+        
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
+  }
 
-    metadataChanged =
-        
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata, 
newMetadataWithNoChange);
-    assertFalse("Metadata check should return false", metadataChanged);
+  @Test
+  public void testCheckForMetadataChangesMultipleChanges() {
+    JobCoordinatorMetadata previousMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    JobCoordinatorMetadata newMetadata = new 
JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, NEW_JOB_MODEL_ID);
+    Set<JobMetadataChange> metadataChanges =
+        
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
+    assertEquals("Metadata check should indicate all changes",
+        ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT, 
JobMetadataChange.JOB_MODEL, JobMetadataChange.CONFIG),
+        metadataChanges);
+    assertEquals("New deployment should be 1 since Epoch ID changed", 1,
+        
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+    assertEquals("Job model changed across application attempts should be 0", 
0,
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getJobModelChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
+    assertEquals("Config changed across application attempts should be 0", 0,
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getConfigChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
+  }
+
+  @Test
+  public void testCheckForMetadataChangesNoChanges() {
+    JobCoordinatorMetadata previousMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    JobCoordinatorMetadata newMetadata = new 
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+    Set<JobMetadataChange> metadataChanges =
+        
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
+    assertEquals("Metadata check should indicate no changes", 
ImmutableSet.of(), metadataChanges);
+    assertEquals("New deployment should be 0 since Epoch ID did not change", 0,
+        
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+    assertEquals("Job model changed across application attempts should be 0", 
0,
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getJobModelChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
+    assertEquals("Config changed across application attempts should be 0", 0,
+        this.jobCoordinatorMetadataManager.getMetrics()
+            .getConfigChangedAcrossApplicationAttempt()
+            .getValue()
+            .intValue());
     assertEquals("Application attempt count should be 1", 1,
-        metrics.getApplicationAttemptCount().getValue().intValue());
+        
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
   }
 
   @Test

Reply via email to