cameronlee314 commented on a change in pull request #1519:
URL: https://github.com/apache/samza/pull/1519#discussion_r691533650



##########
File path: 
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -134,29 +135,34 @@ public JobCoordinatorMetadata 
generateJobCoordinatorMetadata(JobModel jobModel,
    *
    * @return true if metadata changed, false otherwise
    */
-  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
-    boolean changed = true;
-
-    if (previousMetadata == null) {
-      metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+  public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata 
newMetadata,
+      JobCoordinatorMetadata previousMetadata) {
+    Set<JobMetadataChange> changes = new HashSet<>();
+    boolean newDeployment = false;
+    if (previousMetadata == null || 
!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
       metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
-      metrics.setJobModelChangedAcrossApplicationAttempt(1);
-    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
-      metrics.setConfigChangedAcrossApplicationAttempt(1);
-    } else {
-      changed = false;
-      metrics.incrementApplicationAttemptCount();
+      newDeployment = true;
+      changes.add(JobMetadataChange.NEW_DEPLOYMENT);
     }
-
-    if (changed) {
-      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
-    } else {
+    if (previousMetadata == null || 
!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      if (!newDeployment) {
+        metrics.setJobModelChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.JOB_MODEL);
+    }
+    if (previousMetadata == null || 
!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      if (!newDeployment) {
+        metrics.setConfigChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.CONFIG);
+    }
+    if (changes.isEmpty()) {
       LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+      metrics.incrementApplicationAttemptCount();
+    } else {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);

Review comment:
       IIRC, I did have that early termination check initially, but I liked how 
the current way has only a single explicit condition for each different type of 
change. I was considering the case where someone adds a new 
`JobMetadataChange`: they will be forced to add a new condition, which will 
help ensure it gets handled properly. If we use early termination, then a new 
enum might get automatically lumped in, and that might be undesirable.
   I don't think early termination eliminates the check for new deployment, due 
to the semantics of the metrics. `previousMetadata == null` doesn't cover the 
case of epoch id changing, but an epoch id change doesn't necessarily always 
mean a job model change.

##########
File path: 
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -207,29 +213,33 @@ public void 
writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
   }
 
   /**
-   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
-   * way of generating this ID and we will need some contract between the 
underlying cluster manager and samza engine
-   * around what the epoch ID should be like and what is needed to generate is 
across different cluster offerings.
-   * Due to unknowns defined above, we leave it as is and keep it simple for 
now. It is favorable to keep it this way
-   * instead of introducing a loosely defined interface/API and marking it 
unstable.
-   *
    * The properties of the epoch identifier are as follows
    *  1. Unique across applications in the cluster
    *  2. Remains unchanged within a single deployment lifecycle
    *  3. Remains unchanged across application attempt within a single 
deployment lifecycle
    *  4. Changes across deployment lifecycle
    *
-   *  Note: The above properties is something we want keep intact when 
extracting this into a well defined interface
-   *  or contract for YARN AM HA to work.
-   *  The format and property used to generate ID is specific to YARN and the 
specific format of the container name
-   *  is a public contract by YARN which is likely to remain backward 
compatible.
+   * For YARN environment:
+   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
+   * way of generating this ID, since it is YARN-specific. This is left as a 
legacy flow for backwards compatibility, as
+   * the original implementation did not define a cluster-agnostic contract.
+   * The format and property used to generate ID is specific to YARN and the 
specific format of the container name
+   * is a public contract by YARN which is likely to remain backward 
compatible.
+   *
+   * For non-YARN environments:
+   * Extract this from the environment variable SAMZA_EPOCH_ID. This is a more 
generic way of obtaining an epoch id, but
+   * it does require the resource management layer to inject this value.
    *
    * @return an identifier associated with the job coordinator satisfying the 
above properties
    */
   @VisibleForTesting
   String fetchEpochIdForJobCoordinator() {
-    String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
-    return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+    if (ClusterType.YARN.equals(this.clusterType)) {
+      String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+      return containerIdParts[1] + CONTAINER_ID_DELIMITER + 
containerIdParts[2];
+    } else {
+      return getEnvProperty("SAMZA_EPOCH_ID");

Review comment:
       Good point, I'll improve docs on this.

##########
File path: 
samza-core/src/main/java/org/apache/samza/job/metadata/JobMetadataChange.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.metadata;
+
+/**
+ * Provides granularity into changes in job metadata (e.g. new job model, new 
config).
+ */
+public enum JobMetadataChange {

Review comment:
       To me, in the current state of the code, this doesn't seem like an 
actual API at this point. Implementations of coordinators use this object, but 
if you use that categorization for "API", then you also need to include any of 
the other classes that could be used by a coordinator (e.g. 
`StreamMetadataCache`, `StreamPartitionCountMonitor`, 
`JobCoordinatorMetadataManager`, etc.). Therefore, just moving a couple classes 
doesn't really do much.
   I actually do think there is a lot of opportunity for refactoring the 
classes into modules such that these components do go into a more explicit 
"API" module, but that is a much bigger scope than this PR.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -368,8 +368,8 @@ void generateAndUpdateJobCoordinatorMetadata(JobModel 
jobModel) {
     JobCoordinatorMetadataManager jobCoordinatorMetadataManager = 
createJobCoordinatorMetadataManager();
 
     JobCoordinatorMetadata previousMetadata = 
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
-    JobCoordinatorMetadata newMetadata = 
jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);

Review comment:
       Oh, that's pretty misleading that config isn't always fully filled in, 
and it's worse because now `JobModel` is part of the API.
   I thought I was just doing a minor clean-up, so this doesn't need to be 
changed.
   Regarding your note about `JobModel` being a container for config: 
`JobModel` is the single object which gets coordinated across containers, and 
it still feels like config might be something that should be coordinated 
between containers, so I'm not totally convinced that config should be pulled 
out of `JobModel`. Anyways, that is out-of-scope for this PR.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to