mynameborat commented on a change in pull request #1519:
URL: https://github.com/apache/samza/pull/1519#discussion_r690861039



##########
File path: 
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -207,29 +213,33 @@ public void 
writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
   }
 
   /**
-   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
-   * way of generating this ID and we will need some contract between the 
underlying cluster manager and samza engine
-   * around what the epoch ID should be like and what is needed to generate is 
across different cluster offerings.
-   * Due to unknowns defined above, we leave it as is and keep it simple for 
now. It is favorable to keep it this way
-   * instead of introducing a loosely defined interface/API and marking it 
unstable.
-   *
    * The properties of the epoch identifier are as follows
    *  1. Unique across applications in the cluster
    *  2. Remains unchanged within a single deployment lifecycle
    *  3. Remains unchanged across application attempt within a single 
deployment lifecycle
    *  4. Changes across deployment lifecycle
    *
-   *  Note: The above properties is something we want keep intact when 
extracting this into a well defined interface
-   *  or contract for YARN AM HA to work.
-   *  The format and property used to generate ID is specific to YARN and the 
specific format of the container name
-   *  is a public contract by YARN which is likely to remain backward 
compatible.
+   * For YARN environment:
+   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
+   * way of generating this ID, since it is YARN-specific. This is left as a 
legacy flow for backwards compatibility, as
+   * the original implementation did not define a cluster-agnostic contract.
+   * The format and property used to generate ID is specific to YARN and the 
specific format of the container name
+   * is a public contract by YARN which is likely to remain backward 
compatible.
+   *
+   * For non-YARN environments:
+   * Extract this from the environment variable SAMZA_EPOCH_ID. This is a more 
generic way of obtaining an epoch id, but
+   * it does require the resource management layer to inject this value.
    *
    * @return an identifier associated with the job coordinator satisfying the 
above properties
    */
   @VisibleForTesting
   String fetchEpochIdForJobCoordinator() {
-    String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
-    return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+    if (ClusterType.YARN.equals(this.clusterType)) {
+      String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+      return containerIdParts[1] + CONTAINER_ID_DELIMITER + 
containerIdParts[2];
+    } else {
+      return getEnvProperty("SAMZA_EPOCH_ID");

Review comment:
       Is it fair to treat as this a configuration that operators of samza if 
not the end users should be aware of? If so, can we document what is the 
expected properties, format/protocol of the value? 
   
   Unfortunately, we don't have a central place for some of the environment 
variables we pass to our system and i am okay if you want to document it 
alongside the configurations or if you find some other place that is 
appropriate.

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -368,8 +368,8 @@ void generateAndUpdateJobCoordinatorMetadata(JobModel 
jobModel) {
     JobCoordinatorMetadataManager jobCoordinatorMetadataManager = 
createJobCoordinatorMetadataManager();
 
     JobCoordinatorMetadata previousMetadata = 
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
-    JobCoordinatorMetadata newMetadata = 
jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);

Review comment:
       There are scenarios where the job model object doesn't have config 
populated within. E.g., Standalone.
   
   Hence the signature to explicitly pass around config. IMO, `JobModel` being 
a container for configuration shouldn't be the ideal case and I had to keep 
this around for back-compat reasons in the YARN (a single servlet to expose the 
configuration + JobModel to workers).
   
   That said, the current model  constrains updates to one vs another and 
forces to update the container object `JobModel` as a whole. In YARN, this 
isn't a problem, because the lifecycle of both co-exist. 
   
   Do you have any strong reasons behind making this change? If not, I'd  
prefer to keep it the way it is so that it allows us the flexibility to not 
depend on the config within `JobModel` and potentially help in the path of 
extracting it from `JobModel`.

##########
File path: 
samza-core/src/main/java/org/apache/samza/job/metadata/JobMetadataChange.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.metadata;
+
+/**
+ * Provides granularity into changes in job metadata (e.g. new job model, new 
config).
+ */
+public enum JobMetadataChange {

Review comment:
       Can we move this under samza-api along with `JobCoordinatorMetadata`? 

##########
File path: 
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -134,29 +135,34 @@ public JobCoordinatorMetadata 
generateJobCoordinatorMetadata(JobModel jobModel,
    *
    * @return true if metadata changed, false otherwise
    */
-  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
-    boolean changed = true;
-
-    if (previousMetadata == null) {
-      metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+  public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata 
newMetadata,
+      JobCoordinatorMetadata previousMetadata) {
+    Set<JobMetadataChange> changes = new HashSet<>();
+    boolean newDeployment = false;
+    if (previousMetadata == null || 
!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
       metrics.setNewDeployment(1);
-    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
-      metrics.setJobModelChangedAcrossApplicationAttempt(1);
-    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
-      metrics.setConfigChangedAcrossApplicationAttempt(1);
-    } else {
-      changed = false;
-      metrics.incrementApplicationAttemptCount();
+      newDeployment = true;
+      changes.add(JobMetadataChange.NEW_DEPLOYMENT);
     }
-
-    if (changed) {
-      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
-    } else {
+    if (previousMetadata == null || 
!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      if (!newDeployment) {
+        metrics.setJobModelChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.JOB_MODEL);
+    }
+    if (previousMetadata == null || 
!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      if (!newDeployment) {
+        metrics.setConfigChangedAcrossApplicationAttempt(1);
+      }
+      changes.add(JobMetadataChange.CONFIG);
+    }
+    if (changes.isEmpty()) {
       LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+      metrics.incrementApplicationAttemptCount();
+    } else {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);

Review comment:
       Thoughts on early termination on `previousMetadata` being `null`? I am 
not fan of multiple return statements in general but it seems to help w/ 
readability in this case and eliminates to check for new deployment in every 
branch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to