This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 120098d SAMZA-2670: Update granularity of
JobCoordinatorMetadataManager.checkForMetadataChanges and epoch id extraction
(#1519)
120098d is described below
commit 120098d6c45ca023d2a0fb144b0e333ab4e8ffe8
Author: Cameron Lee <[email protected]>
AuthorDate: Tue Aug 24 14:08:17 2021 -0700
SAMZA-2670: Update granularity of
JobCoordinatorMetadataManager.checkForMetadataChanges and epoch id extraction
(#1519)
Changes:
1. Modify checkForMetadataChanges to return a set of changes instead of a
boolean.
2. Modify fetchEpochIdForJobCoordinator to read epoch id out of a generic
environment variable instead of parsing it out of a YARN-specific variable.
3. Move JobCoordinatorMetadataManager to a new package because
org.apache.samza.coordinator is bloated.
API changes: For non-YARN usage of JobCoordinatorMetadataManager, pass the
epoch id through the SAMZA_EPOCH_ID environment variable. There are currently
no non-YARN usage, but there will be one coming in the near future.
---
.../versioned/container/metrics-table.html | 2 +-
.../org/apache/samza/job/JobMetadataChange.java | 31 +++++
.../clustermanager/ClusterBasedJobCoordinator.java | 4 +-
.../samza/environment/EnvironmentVariables.java | 39 ++++++
.../metadata}/JobCoordinatorMetadataManager.java | 83 ++++++-----
.../TestClusterBasedJobCoordinator.java | 10 +-
.../TestJobCoordinatorMetadataManager.java | 154 +++++++++++++++------
7 files changed, 242 insertions(+), 81 deletions(-)
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html
b/docs/learn/documentation/versioned/container/metrics-table.html
index 3bab5be..b7cd752 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -984,7 +984,7 @@
<td><a href="#average-time">Average time</a> taken for all the
processors to get the latest version of the job model after single processor
change (without the occurence of a barrier timeout)</td>
</tr>
<tr>
- <th colspan="2" class="section"
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following
metrics are applicable when Application Master High Availability is
enabled</span></th>
+ <th colspan="2" class="section"
id="job-coordinator-metadata-manager-metrics">org.apache.samza.job.metadata.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following
metrics are applicable when Application Master High Availability is
enabled</span></th>
</tr>
<tr>
<td>application-attempt-count</td>
diff --git
a/samza-api/src/main/java/org/apache/samza/job/JobMetadataChange.java
b/samza-api/src/main/java/org/apache/samza/job/JobMetadataChange.java
new file mode 100644
index 0000000..9048a2d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/JobMetadataChange.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+/**
+ * Provides granularity into changes in job metadata (e.g. new job model, new
config).
+ */
+public enum JobMetadataChange {
+ // indicates a new deployment of a job
+ NEW_DEPLOYMENT,
+ // indicates a new job model compared to the previous job model
+ JOB_MODEL,
+ // indicates a new set of configs compared to the previous configs
+ CONFIG
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index e956413..b62678b 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -38,7 +38,7 @@ import org.apache.samza.container.ExecutionContainerIdManager;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.InputStreamsDiscoveredException;
-import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.PartitionChangeException;
@@ -369,7 +369,7 @@ public class ClusterBasedJobCoordinator {
JobCoordinatorMetadata previousMetadata =
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
JobCoordinatorMetadata newMetadata =
jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);
- if (jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)) {
+ if (!jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata).isEmpty()) {
jobCoordinatorMetadataManager.writeJobCoordinatorMetadata(newMetadata);
metadataChangedAcrossAttempts = true;
}
diff --git
a/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
new file mode 100644
index 0000000..91977a2
--- /dev/null
+++
b/samza-core/src/main/java/org/apache/samza/environment/EnvironmentVariables.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.environment;
+
+/**
+ * Contains environment variables that are used by Samza components. This
provides a common place to put these variables
+ * and provide documentation for them.
+ */
+public class EnvironmentVariables {
+ /**
+ * The properties of the epoch identifier are as follows
+ * 1. Unique across applications in the cluster
+ * 2. Remains unchanged within a single deployment lifecycle
+ * 3. Remains unchanged across application attempt within a single
deployment lifecycle
+ * 4. Changes across deployment lifecycle
+ *
+ * If using a non-YARN environment, then this needs to be filled in so that
the job coordinator can properly manage
+ * changes (e.g. job model changes) within and across deployments.
+ * See JobCoordinatorMetadataManager.fetchEpochIdForJobCoordinator for more
details about the YARN alternative to this
+ * environment variable.
+ */
+ public static final String SAMZA_EPOCH_ID = "SAMZA_EPOCH_ID";
+}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
b/samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
similarity index 86%
rename from
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
rename to
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
index 7e40382..bc61a7d 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
+++
b/samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.coordinator;
+package org.apache.samza.job.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
@@ -25,15 +25,19 @@ import com.google.common.collect.ImmutableSortedSet;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
import com.google.common.hash.PrimitiveSink;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import org.apache.samza.SamzaException;
+import org.apache.samza.environment.EnvironmentVariables;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.Gauge;
@@ -77,7 +81,7 @@ public class JobCoordinatorMetadataManager {
}
/**
- * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+ * Generates {@link JobCoordinatorMetadata}.
*
* Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}.
Refer to the javadocs for more
* details on how it is generated and the properties of the identifier.
@@ -96,7 +100,6 @@ public class JobCoordinatorMetadataManager {
* Both config and job model identifiers should a 32 bit integer.
*
* @param jobModel job model used for generating the metadata
- * @param config config used for generating the metadata
*
* @return the metadata for the job coordinator
*/
@@ -134,29 +137,34 @@ public class JobCoordinatorMetadataManager {
*
* @return true if metadata changed, false otherwise
*/
- public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata,
JobCoordinatorMetadata previousMetadata) {
- boolean changed = true;
-
- if (previousMetadata == null) {
- metrics.setNewDeployment(1);
- } else if
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+ public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata
newMetadata,
+ JobCoordinatorMetadata previousMetadata) {
+ Set<JobMetadataChange> changes = new HashSet<>();
+ boolean newDeployment = false;
+ if (previousMetadata == null ||
!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
metrics.setNewDeployment(1);
- } else if
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
- metrics.setJobModelChangedAcrossApplicationAttempt(1);
- } else if
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
- metrics.setConfigChangedAcrossApplicationAttempt(1);
- } else {
- changed = false;
- metrics.incrementApplicationAttemptCount();
+ newDeployment = true;
+ changes.add(JobMetadataChange.NEW_DEPLOYMENT);
}
-
- if (changed) {
- LOG.info("Job coordinator metadata changed from: {} to: {}",
previousMetadata, newMetadata);
- } else {
+ if (previousMetadata == null ||
!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+ if (!newDeployment) {
+ metrics.setJobModelChangedAcrossApplicationAttempt(1);
+ }
+ changes.add(JobMetadataChange.JOB_MODEL);
+ }
+ if (previousMetadata == null ||
!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+ if (!newDeployment) {
+ metrics.setConfigChangedAcrossApplicationAttempt(1);
+ }
+ changes.add(JobMetadataChange.CONFIG);
+ }
+ if (changes.isEmpty()) {
LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+ metrics.incrementApplicationAttemptCount();
+ } else {
+ LOG.info("Job coordinator metadata changed from: {} to: {}",
previousMetadata, newMetadata);
}
-
- return changed;
+ return changes;
}
/**
@@ -207,29 +215,33 @@ public class JobCoordinatorMetadataManager {
}
/**
- * Generate the epoch id using the execution container id that is passed
through system environment. This isn't ideal
- * way of generating this ID and we will need some contract between the
underlying cluster manager and samza engine
- * around what the epoch ID should be like and what is needed to generate is
across different cluster offerings.
- * Due to unknowns defined above, we leave it as is and keep it simple for
now. It is favorable to keep it this way
- * instead of introducing a loosely defined interface/API and marking it
unstable.
- *
* The properties of the epoch identifier are as follows
* 1. Unique across applications in the cluster
* 2. Remains unchanged within a single deployment lifecycle
* 3. Remains unchanged across application attempt within a single
deployment lifecycle
* 4. Changes across deployment lifecycle
*
- * Note: The above properties is something we want keep intact when
extracting this into a well defined interface
- * or contract for YARN AM HA to work.
- * The format and property used to generate ID is specific to YARN and the
specific format of the container name
- * is a public contract by YARN which is likely to remain backward
compatible.
+ * For YARN environment:
+ * Generate the epoch id using the execution container id that is passed
through system environment. This isn't ideal
+ * way of generating this ID, since it is YARN-specific. This is left as a
legacy flow for backwards compatibility, as
+ * the original implementation did not define a cluster-agnostic contract.
+ * The format and property used to generate ID is specific to YARN and the
specific format of the container name
+ * is a public contract by YARN which is likely to remain backward
compatible.
+ *
+ * For non-YARN environments:
+ * Extract this from the environment variable SAMZA_EPOCH_ID. This is a more
generic way of obtaining an epoch id, but
+ * it does require the resource management layer to inject this value.
*
* @return an identifier associated with the job coordinator satisfying the
above properties
*/
@VisibleForTesting
String fetchEpochIdForJobCoordinator() {
- String[] containerIdParts =
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
- return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+ if (ClusterType.YARN.equals(this.clusterType)) {
+ String[] containerIdParts =
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+ return containerIdParts[1] + CONTAINER_ID_DELIMITER +
containerIdParts[2];
+ } else {
+ return getEnvProperty(EnvironmentVariables.SAMZA_EPOCH_ID);
+ }
}
@VisibleForTesting
@@ -267,7 +279,8 @@ public class JobCoordinatorMetadataManager {
* Type of the cluster deployment associated with the {@link
JobCoordinatorMetadataManager}
*/
public enum ClusterType {
- YARN
+ YARN,
+ NON_YARN
}
/**
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index e0b0739..6eeb119 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -20,6 +20,7 @@
package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,7 +33,8 @@ import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
-import org.apache.samza.coordinator.JobCoordinatorMetadataManager;
+import org.apache.samza.job.metadata.JobCoordinatorMetadataManager;
+import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
@@ -249,7 +251,8 @@ public class TestClusterBasedJobCoordinator {
when(jobCoordinatorMetadataManager.readJobCoordinatorMetadata()).thenReturn(previousMetadata);
when(jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(any(),
any())).thenReturn(newMetadata);
- when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)).thenReturn(false);
+ when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)).thenReturn(
+ ImmutableSet.of());
when(clusterBasedJobCoordinator.createJobCoordinatorMetadataManager()).thenReturn(jobCoordinatorMetadataManager);
/*
@@ -264,7 +267,8 @@ public class TestClusterBasedJobCoordinator {
/*
* Verify if there are changes to metadata, we persist the new metadata &
update the metadata changed flag
*/
- when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)).thenReturn(true);
+ when(jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata)).thenReturn(
+ ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT));
clusterBasedJobCoordinator.generateAndUpdateJobCoordinatorMetadata(mockJobModel);
assertTrue("JC metadata changed should be true",
clusterBasedJobCoordinator.isMetadataChangedAcrossAttempts());
verify(jobCoordinatorMetadataManager,
times(1)).writeJobCoordinatorMetadata(newMetadata);
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
b/samza-core/src/test/java/org/apache/samza/job/metadata/TestJobCoordinatorMetadataManager.java
similarity index 59%
rename from
samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
rename to
samza-core/src/test/java/org/apache/samza/job/metadata/TestJobCoordinatorMetadataManager.java
index b1739d9..c4ad179 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobCoordinatorMetadataManager.java
+++
b/samza-core/src/test/java/org/apache/samza/job/metadata/TestJobCoordinatorMetadataManager.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.samza.coordinator;
+package org.apache.samza.job.metadata;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -32,6 +33,7 @@ import
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStrea
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.JobMetadataChange;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
@@ -41,11 +43,10 @@ import org.apache.samza.serializers.Serde;
import org.junit.Before;
import org.junit.Test;
-import static
org.apache.samza.coordinator.JobCoordinatorMetadataManager.ClusterType;
-import static
org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_DELIMITER;
-import static
org.apache.samza.coordinator.JobCoordinatorMetadataManager.CONTAINER_ID_PROPERTY;
+import static
org.apache.samza.job.metadata.JobCoordinatorMetadataManager.CONTAINER_ID_DELIMITER;
+import static
org.apache.samza.job.metadata.JobCoordinatorMetadataManager.CONTAINER_ID_PROPERTY;
+import static
org.apache.samza.job.metadata.JobCoordinatorMetadataManager.ClusterType;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -108,46 +109,119 @@ public class TestJobCoordinatorMetadataManager {
}
@Test
- public void testCheckForMetadataChanges() {
- JobCoordinatorMetadata previousMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
- JobCoordinatorMetadata newMetadataWithDifferentEpochId =
- new JobCoordinatorMetadata(NEW_EPOCH_ID, OLD_CONFIG_ID,
OLD_JOB_MODEL_ID);
-
- JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics metrics
=
- jobCoordinatorMetadataManager.getMetrics();
+ public void testCheckForMetadataChangesNoPreviousMetadata() {
+ JobCoordinatorMetadata newMetadata = new
JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, NEW_JOB_MODEL_ID);
+ Set<JobMetadataChange> metadataChanges =
+
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, null);
+ assertEquals("Metadata check should indicate all changes",
+ ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT,
JobMetadataChange.JOB_MODEL, JobMetadataChange.CONFIG),
+ metadataChanges);
+ assertEquals("New deployment should be 1 since Epoch ID changed", 1,
+
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+ assertEquals("Job model changed across application attempts should be 0",
0,
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getJobModelChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
+ assertEquals("Config changed across application attempts should be 0", 0,
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getConfigChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
+ }
- boolean metadataChanged =
-
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata,
newMetadataWithDifferentEpochId);
- assertTrue("Metadata check should return true", metadataChanged);
+ @Test
+ public void testCheckForMetadataChangesNewDeployment() {
+ JobCoordinatorMetadata previousMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+ JobCoordinatorMetadata newMetadata = new
JobCoordinatorMetadata(NEW_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+ Set<JobMetadataChange> metadataChanges =
+
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
+ assertEquals("Metadata check should indicate new deployment",
ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT),
+ metadataChanges);
assertEquals("New deployment should be 1 since Epoch ID changed", 1,
- metrics.getNewDeployment().getValue().intValue());
-
- JobCoordinatorMetadata newMetadataWithDifferentConfigId =
- new JobCoordinatorMetadata(OLD_EPOCH_ID, NEW_CONFIG_ID,
OLD_JOB_MODEL_ID);
- metadataChanged =
-
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata,
newMetadataWithDifferentConfigId);
- assertTrue("Metadata check should return true", metadataChanged);
- assertEquals("Config across application attempts should be 1", 1,
-
metrics.getConfigChangedAcrossApplicationAttempt().getValue().intValue());
-
- JobCoordinatorMetadata newMetadataWithDifferentJobModelId =
- new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID,
NEW_JOB_MODEL_ID);
- metadataChanged =
-
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata,
newMetadataWithDifferentJobModelId);
- assertTrue("Metadata check should return true", metadataChanged);
+
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+ assertEquals("Application attempt count should be 0", 0,
+
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
+ }
+
+ @Test
+ public void testCheckForMetadataChangesJobModelChange() {
+ JobCoordinatorMetadata previousMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+ JobCoordinatorMetadata newMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, NEW_JOB_MODEL_ID);
+ Set<JobMetadataChange> metadataChanges =
+
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
+ assertEquals("Metadata check should indicate new job model",
ImmutableSet.of(JobMetadataChange.JOB_MODEL),
+ metadataChanges);
assertEquals("Job model changed across application attempts should be 1",
1,
-
metrics.getJobModelChangedAcrossApplicationAttempt().getValue().intValue());
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getJobModelChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
+ assertEquals("Application attempt count should be 0", 0,
+
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
+ }
- JobCoordinatorMetadata newMetadataWithNoChange =
- new JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID,
OLD_JOB_MODEL_ID);
+ @Test
+ public void testCheckForMetadataChangesConfigChange() {
+ JobCoordinatorMetadata previousMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+ JobCoordinatorMetadata newMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, NEW_CONFIG_ID, OLD_JOB_MODEL_ID);
+ Set<JobMetadataChange> metadataChanges =
+
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
+ assertEquals("Metadata check should indicate new config",
ImmutableSet.of(JobMetadataChange.CONFIG),
+ metadataChanges);
+ assertEquals("Config changed across application attempts should be 1", 1,
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getConfigChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
assertEquals("Application attempt count should be 0", 0,
- metrics.getApplicationAttemptCount().getValue().intValue());
+
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
+ }
- metadataChanged =
-
jobCoordinatorMetadataManager.checkForMetadataChanges(previousMetadata,
newMetadataWithNoChange);
- assertFalse("Metadata check should return false", metadataChanged);
+ @Test
+ public void testCheckForMetadataChangesMultipleChanges() {
+ JobCoordinatorMetadata previousMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+ JobCoordinatorMetadata newMetadata = new
JobCoordinatorMetadata(NEW_EPOCH_ID, NEW_CONFIG_ID, NEW_JOB_MODEL_ID);
+ Set<JobMetadataChange> metadataChanges =
+
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
+ assertEquals("Metadata check should indicate all changes",
+ ImmutableSet.of(JobMetadataChange.NEW_DEPLOYMENT,
JobMetadataChange.JOB_MODEL, JobMetadataChange.CONFIG),
+ metadataChanges);
+ assertEquals("New deployment should be 1 since Epoch ID changed", 1,
+
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+ assertEquals("Job model changed across application attempts should be 0",
0,
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getJobModelChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
+ assertEquals("Config changed across application attempts should be 0", 0,
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getConfigChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
+ }
+
+ @Test
+ public void testCheckForMetadataChangesNoChanges() {
+ JobCoordinatorMetadata previousMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+ JobCoordinatorMetadata newMetadata = new
JobCoordinatorMetadata(OLD_EPOCH_ID, OLD_CONFIG_ID, OLD_JOB_MODEL_ID);
+ Set<JobMetadataChange> metadataChanges =
+
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
+ assertEquals("Metadata check should indicate no changes",
ImmutableSet.of(), metadataChanges);
+ assertEquals("New deployment should be 0 since Epoch ID did not change", 0,
+
this.jobCoordinatorMetadataManager.getMetrics().getNewDeployment().getValue().intValue());
+ assertEquals("Job model changed across application attempts should be 0",
0,
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getJobModelChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
+ assertEquals("Config changed across application attempts should be 0", 0,
+ this.jobCoordinatorMetadataManager.getMetrics()
+ .getConfigChangedAcrossApplicationAttempt()
+ .getValue()
+ .intValue());
assertEquals("Application attempt count should be 1", 1,
- metrics.getApplicationAttemptCount().getValue().intValue());
+
this.jobCoordinatorMetadataManager.getMetrics().getApplicationAttemptCount().getValue().intValue());
}
@Test