lakshmi-manasa-g commented on a change in pull request #1449:
URL: https://github.com/apache/samza/pull/1449#discussion_r538624834



##########
File path: 
samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import java.util.Objects;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata 
refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, 
deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new 
binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault 
tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or 
regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. 
The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts 
related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle 
within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single 
deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration 
snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the subset 
of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job 
in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for 
changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the work 
assignment remains unchanged
+ *
+ * Notes on interface stability - It is used internally by Samza for job 
coordinator high availability in YARN
+ * deployment offering. It may evolve depending on expanding the scope beyond 
YARN and hence unstable.
+ *
+ */
[email protected]
+public class JobCoordinatorMetadata {
+  private final String configId;
+  private final String epochId;
+  private final String jobModelId;
+
+  public JobCoordinatorMetadata(String epochId, String configId, String 
jobModelId) {
+    this.configId = configId;
+    this.epochId = epochId;
+    this.jobModelId = jobModelId;
+  }
+
+  public String getConfigId() {
+    return configId;
+  }
+
+  public String getEpochId() {
+    return this.epochId;
+  }
+
+  public String getJobModelId() {
+    return jobModelId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JobCoordinatorMetadata)) {
+      return false;
+    }
+    JobCoordinatorMetadata metadata = (JobCoordinatorMetadata) o;
+    return Objects.equals(configId, metadata.configId) && 
Objects.equals(epochId, metadata.epochId)

Review comment:
       ah i see. got it.

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to manage read and writes of {@link JobCoordinatorMetadata} to 
{@link MetadataStore}. It also provides
+ * additional helper functionalities to generate {@link 
JobCoordinatorMetadata} and check for changes across runs.
+ */
+public class JobCoordinatorMetadataManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
+  private static final String APPLICATION_ATTEMPT_COUNT = 
"applicationAttemptCount";
+  private static final String JOB_COORDINATOR_MANAGER_METRICS = 
"job-coordinator-manager";
+  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
+  private static final String CONFIG_CHANGED = "configChanged";
+  private static final String NEW_DEPLOYMENT = "newDeployment";
+
+  static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
+  static final String CONTAINER_ID_DELIMITER = "_";
+
+  private final Counter applicationAttemptCount;
+  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> newDeployment;
+  private final MetadataStore metadataStore;
+  private final ObjectMapper metadataMapper = 
SamzaObjectMapper.getObjectMapper();
+  private final Serde<String> valueSerde;
+  private final String clusterType;
+
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, String 
clusterType, MetricsRegistry metricsRegistry) {
+    Preconditions.checkState(StringUtils.isNotBlank(clusterType), "Cluster 
type cannot be empty");
+    this.clusterType = clusterType;
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE);
+
+    applicationAttemptCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, 
APPLICATION_ATTEMPT_COUNT);
+    configChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
CONFIG_CHANGED, 0);
+    jobModelChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
JOB_MODEL_CHANGED, 0);
+    newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
NEW_DEPLOYMENT, 0);
+  }
+
+  /**
+   * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+   *
+   * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. 
Refer to the javadocs for more
+   * details on how it is generated and the properties of the identifier.
+   *
+   * Config ID - A unique and reproducible identifier that is generated based 
on the input {@link Config}. It uses
+   * a {@link Funnel} to use a subset of the input configuration to generate 
the identifier and as long as the subset
+   * of the configuration remains same, the identifier is guaranteed to be 
same. For the list of config prefixes used
+   * by the funnel refer to {@link ConfigHashFunnel}
+   *
+   * JobModel ID - A unique and reproducible identifier that is generated 
based on the input {@link JobModel}. It only
+   * uses the {@link org.apache.samza.job.model.ContainerModel} within the 
{@linkplain JobModel} for generation. We
+   * serialize the data into bytes and use those bytes to compute the 
identifier.
+   *
+   * In case of YARN, the epoch identifier is extracted from the application 
attempt and translates to applicationId
+   * e.g. 1606797336059_0010
+   * Both config and job model identifiers should a 32 bit integer.
+   *
+   * @param jobModel job model used for generating the metadata
+   * @param config config used for generating the metadata
+   *
+   * @return the metadata for the job coordinator
+   */
+  public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel 
jobModel, Config config) {
+    try {
+      int jobModelId = Hashing
+          .crc32c()
+          
.hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers()))
+          .asInt();
+      int configId = Hashing
+          .crc32()
+          .hashObject(config, new ConfigHashFunnel())
+          .asInt();
+
+      LOG.info("Generated job model id {} and config id {}", jobModelId, 
configId);
+      return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), 
String.valueOf(configId),
+          String.valueOf(jobModelId));
+    } catch (Exception e) {
+      LOG.error("Failed to generate metadata for the current attempt due to ", 
e);
+      throw new RuntimeException("Failed to generate the metadata for the 
current attempt due to ", e);
+    }
+  }
+
+  /**
+   * Check for changes between the metadata passed as inputs. Metadata is 
considered changed if any of the attributes within
+   * {@linkplain JobCoordinatorMetadata} changes.
+   *
+   * We intentionally check for each changes to help us track at this 
granularity. We want to use this information
+   * to determine if complex handling is required to cater these changes 
instead of blindly restarting all the
+   * containers upstream.
+   *
+   * @param newMetadata new metadata to be compared
+   * @param previousMetadata previous metadata to be compared against
+   *
+   * @return true if metadata changed, false otherwise
+   */
+  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
+    boolean changed = true;
+
+    if (previousMetadata == null) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      jobModelChangedAcrossApplicationAttempt.set(1);
+    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      configChangedAcrossApplicationAttempt.set(1);
+    } else {
+      changed = false;
+      applicationAttemptCount.inc();
+    }
+
+    if (changed) {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
+    } else {
+      LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+    }
+
+    return changed;
+  }
+
+  /**
+   * Reads the {@link JobCoordinatorMetadata} from the metadata store. It 
fetches the metadata
+   * associated with cluster type specified at the creation of the manager.
+   *
+   * @return job coordinator metadata
+   */
+  public JobCoordinatorMetadata readJobCoordinatorMetadata() {
+    JobCoordinatorMetadata metadata = null;
+    for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) {
+      if (clusterType.equals(entry.getKey())) {
+        try {
+          String metadataString = valueSerde.fromBytes(entry.getValue());
+          metadata = metadataMapper.readValue(metadataString, 
JobCoordinatorMetadata.class);
+          break;
+        } catch (Exception e) {
+          LOG.error("Failed to read job coordinator metadata due to ", e);
+        }
+      }
+    }
+
+    LOG.info("Fetched the job coordinator metadata for cluster {} as {}.", 
clusterType, metadata);
+    return metadata;
+  }
+
+  /**
+   * Persist the {@link JobCoordinatorMetadata} in metadata store. The job 
coordinator metadata is associated
+   * with the cluster type specified at the creation of the manager.
+   *
+   * @param metadata metadata to be persisted
+   *
+   * @return true if the write succeeded, false otherwise
+   */
+  public boolean writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
+    Preconditions.checkNotNull(metadata, "Job coordinator metadata cannot be 
null");
+
+    boolean writeSucceeded = false;
+    try {
+      String metadataValueString = metadataMapper.writeValueAsString(metadata);
+      metadataStore.put(clusterType, valueSerde.toBytes(metadataValueString));
+      writeSucceeded = true;
+    } catch (Exception e) {
+      LOG.error("Failed to write the job coordinator metadata to metadata 
store due to ", e);

Review comment:
       yeah i meant the new AM wont see metadata properly -- when i said 
"previousMetadata".
   thanks for addressing it.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to