lakshmi-manasa-g commented on a change in pull request #1449:
URL: https://github.com/apache/samza/pull/1449#discussion_r537918515



##########
File path: 
samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import java.util.Objects;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata 
refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, 
deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new 
binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault 
tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or 
regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. 
The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts 
related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle 
within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single 
deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration 
snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the subset 
of configuration remains unchanged.

Review comment:
       does it change across deployment lifecycle? might stay same if config 
subset is same right.

##########
File path: 
samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import java.util.Objects;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata 
refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, 
deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new 
binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault 
tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or 
regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. 
The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts 
related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle 
within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single 
deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration 
snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the subset 
of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job 
in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for 
changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Reproducible and deterministic

Review comment:
       wondering if the obvious needs to be stated.. "unique" -- aka -- two job 
models JM1 != JM2 means JM_id1 != JM_id2? Or more specifically 
JM1.containerModel != JM2.containerModel then JM_id1 != JM_id2
   
   or is this what you mean by deterministic? 
   
   cause i feel both reproducible and deterministic mean the same --> 
   if n_th run of getJobModelId(JM) = JM_id1 
   then n+1_th run of getJobModelId(JM) should also give JM_id1 itself. 
   maybe i just need some clarification here

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to manage read and writes of {@link JobCoordinatorMetadata} to 
{@link MetadataStore}. It also provides
+ * additional helper functionalities to generate {@link 
JobCoordinatorMetadata} and check for changes across runs.
+ */
+public class JobCoordinatorMetadataManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
+  private static final String APPLICATION_ATTEMPT_COUNT = 
"applicationAttemptCount";
+  private static final String JOB_COORDINATOR_MANAGER_METRICS = 
"job-coordinator-manager";
+  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
+  private static final String CONFIG_CHANGED = "configChanged";
+  private static final String NEW_DEPLOYMENT = "newDeployment";
+
+  static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
+  static final String CONTAINER_ID_DELIMITER = "_";
+
+  private final Counter applicationAttemptCount;
+  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> newDeployment;
+  private final MetadataStore metadataStore;
+  private final ObjectMapper metadataMapper = 
SamzaObjectMapper.getObjectMapper();
+  private final Serde<String> valueSerde;
+  private final String clusterType;
+
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, String 
clusterType, MetricsRegistry metricsRegistry) {
+    Preconditions.checkState(StringUtils.isNotBlank(clusterType), "Cluster 
type cannot be empty");
+    this.clusterType = clusterType;
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE);
+
+    applicationAttemptCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, 
APPLICATION_ATTEMPT_COUNT);
+    configChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
CONFIG_CHANGED, 0);
+    jobModelChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
JOB_MODEL_CHANGED, 0);
+    newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
NEW_DEPLOYMENT, 0);
+  }
+
+  /**
+   * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+   *
+   * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. 
Refer to the javadocs for more
+   * details on how it is generated and the properties of the identifier.
+   *
+   * Config ID - A unique and reproducible identifier that is generated based 
on the input {@link Config}. It uses
+   * a {@link Funnel} to use a subset of the input configuration to generate 
the identifier and as long as the subset
+   * of the configuration remains same, the identifier is guaranteed to be 
same. For the list of config prefixes used
+   * by the funnel refer to {@link ConfigHashFunnel}
+   *
+   * JobModel ID - A unique and reproducible identifier that is generated 
based on the input {@link JobModel}. It only
+   * uses the {@link org.apache.samza.job.model.ContainerModel} within the 
{@linkplain JobModel} for generation. We
+   * serialize the data into bytes and use those bytes to compute the 
identifier.
+   *
+   * In case of YARN, the epoch identifier is extracted from the application 
attempt and translates to applicationId
+   * e.g. 1606797336059_0010
+   * Both config and job model identifiers should a 32 bit integer.
+   *
+   * @param jobModel job model used for generating the metadata
+   * @param config config used for generating the metadata
+   *
+   * @return the metadata for the job coordinator
+   */
+  public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel 
jobModel, Config config) {
+    try {
+      int jobModelId = Hashing
+          .crc32c()
+          
.hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers()))
+          .asInt();
+      int configId = Hashing
+          .crc32()
+          .hashObject(config, new ConfigHashFunnel())
+          .asInt();
+
+      LOG.info("Generated job model id {} and config id {}", jobModelId, 
configId);
+      return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), 
String.valueOf(configId),
+          String.valueOf(jobModelId));
+    } catch (Exception e) {
+      LOG.error("Failed to generate metadata for the current attempt due to ", 
e);
+      throw new RuntimeException("Failed to generate the metadata for the 
current attempt due to ", e);
+    }
+  }
+
+  /**
+   * Check for changes between the metadata passed as inputs. Metadata is 
considered changed if any of the attributes within
+   * {@linkplain JobCoordinatorMetadata} changes.
+   *
+   * We intentionally check for each changes to help us track at this 
granularity. We want to use this information
+   * to determine if complex handling is required to cater these changes 
instead of blindly restarting all the
+   * containers upstream.
+   *
+   * @param newMetadata new metadata to be compared
+   * @param previousMetadata previous metadata to be compared against
+   *
+   * @return true if metadata changed, false otherwise
+   */
+  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
+    boolean changed = true;
+
+    if (previousMetadata == null) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      jobModelChangedAcrossApplicationAttempt.set(1);
+    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      configChangedAcrossApplicationAttempt.set(1);
+    } else {
+      changed = false;
+      applicationAttemptCount.inc();
+    }
+
+    if (changed) {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
+    } else {
+      LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+    }
+
+    return changed;
+  }
+
+  /**
+   * Reads the {@link JobCoordinatorMetadata} from the metadata store. It 
fetches the metadata
+   * associated with cluster type specified at the creation of the manager.
+   *
+   * @return job coordinator metadata
+   */
+  public JobCoordinatorMetadata readJobCoordinatorMetadata() {
+    JobCoordinatorMetadata metadata = null;
+    for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) {
+      if (clusterType.equals(entry.getKey())) {
+        try {
+          String metadataString = valueSerde.fromBytes(entry.getValue());
+          metadata = metadataMapper.readValue(metadataString, 
JobCoordinatorMetadata.class);
+          break;
+        } catch (Exception e) {
+          LOG.error("Failed to read job coordinator metadata due to ", e);
+        }
+      }
+    }
+
+    LOG.info("Fetched the job coordinator metadata for cluster {} as {}.", 
clusterType, metadata);
+    return metadata;
+  }
+
+  /**
+   * Persist the {@link JobCoordinatorMetadata} in metadata store. The job 
coordinator metadata is associated
+   * with the cluster type specified at the creation of the manager.
+   *
+   * @param metadata metadata to be persisted
+   *
+   * @return true if the write succeeded, false otherwise
+   */
+  public boolean writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
+    Preconditions.checkNotNull(metadata, "Job coordinator metadata cannot be 
null");
+
+    boolean writeSucceeded = false;
+    try {
+      String metadataValueString = metadataMapper.writeValueAsString(metadata);
+      metadataStore.put(clusterType, valueSerde.toBytes(metadataValueString));
+      writeSucceeded = true;
+    } catch (Exception e) {
+      LOG.error("Failed to write the job coordinator metadata to metadata 
store due to ", e);

Review comment:
       would swallowing the exception lead to issues like incorrect metrics as 
previousMetadata is non-existent due to exception but metric records new 
deployment? or cause unnecessary restart of containers.
   
   isnt this an important enough thing to fail JC for? cause this will be 
invoked only when AM HA is enabled right, so wont fail a non-AM HA JC 

##########
File path: 
samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import java.util.Objects;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata 
refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, 
deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new 
binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault 
tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or 
regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. 
The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts 
related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle 
within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single 
deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration 
snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the subset 
of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job 
in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for 
changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the work 
assignment remains unchanged
+ *
+ * Notes on interface stability - It is used internally by Samza for job 
coordinator high availability in YARN
+ * deployment offering. It may evolve depending on expanding the scope beyond 
YARN and hence unstable.
+ *
+ */
[email protected]
+public class JobCoordinatorMetadata {
+  private final String configId;
+  private final String epochId;
+  private final String jobModelId;
+
+  public JobCoordinatorMetadata(String epochId, String configId, String 
jobModelId) {

Review comment:
       check notNull? 

##########
File path: 
samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import java.util.Objects;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata 
refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, 
deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new 
binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault 
tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or 
regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. 
The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts 
related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle 
within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single 
deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration 
snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the subset 
of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job 
in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for 
changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the work 
assignment remains unchanged
+ *
+ * Notes on interface stability - It is used internally by Samza for job 
coordinator high availability in YARN
+ * deployment offering. It may evolve depending on expanding the scope beyond 
YARN and hence unstable.
+ *
+ */
[email protected]
+public class JobCoordinatorMetadata {
+  private final String configId;
+  private final String epochId;
+  private final String jobModelId;
+
+  public JobCoordinatorMetadata(String epochId, String configId, String 
jobModelId) {
+    this.configId = configId;
+    this.epochId = epochId;
+    this.jobModelId = jobModelId;
+  }
+
+  public String getConfigId() {
+    return configId;
+  }
+
+  public String getEpochId() {
+    return this.epochId;
+  }
+
+  public String getJobModelId() {
+    return jobModelId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {

Review comment:
       is this a deep or shallow equals cause its happening at object level and 
not really looking into the 3 ids.
   ah, this is supposed to catch the case of -- exactly same object right?

##########
File path: 
samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import java.util.Objects;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata 
refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, 
deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new 
binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault 
tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or 
regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. 
The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts 
related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle 
within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single 
deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration 
snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the subset 
of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job 
in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for 
changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the work 
assignment remains unchanged

Review comment:
       same here.. wrt deployment?

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to manage read and writes of {@link JobCoordinatorMetadata} to 
{@link MetadataStore}. It also provides
+ * additional helper functionalities to generate {@link 
JobCoordinatorMetadata} and check for changes across runs.
+ */
+public class JobCoordinatorMetadataManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
+  private static final String APPLICATION_ATTEMPT_COUNT = 
"applicationAttemptCount";
+  private static final String JOB_COORDINATOR_MANAGER_METRICS = 
"job-coordinator-manager";
+  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
+  private static final String CONFIG_CHANGED = "configChanged";
+  private static final String NEW_DEPLOYMENT = "newDeployment";
+
+  static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
+  static final String CONTAINER_ID_DELIMITER = "_";
+
+  private final Counter applicationAttemptCount;
+  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> newDeployment;
+  private final MetadataStore metadataStore;
+  private final ObjectMapper metadataMapper = 
SamzaObjectMapper.getObjectMapper();
+  private final Serde<String> valueSerde;
+  private final String clusterType;
+
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, String 
clusterType, MetricsRegistry metricsRegistry) {

Review comment:
       what values can cluster type take? should we have an enum for this maybe?

##########
File path: 
samza-api/src/main/java/org/apache/samza/job/JobCoordinatorMetadata.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job;
+
+import java.util.Objects;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * A data model to represent the metadata of the job coordinator. The metadata 
refers to attributes of job coordinator
+ * scoped to attempt within a deployment. For the purpose of this data model, 
deployment and attempt are defined
+ * as follows
+ *
+ * Deployment - Set of actions to stop an existing application, install new 
binaries and submit a request to run the new binaries
+ * Attempt    - Incarnations of application within a deployment for fault 
tolerance; e.g. Job coordinator failures or
+ *              job model changes detected by partition count monitoring or 
regex monitor.
+ *
+ * Metadata generation may require underlying cluster manager's interaction. 
The following describes the properties
+ * of the attributes to provide guidelines for implementors of contracts 
related to metadata generation.
+ *
+ * Epoch ID - An identifier to associated with the job coordinator's lifecycle 
within the scope of a single deployment.
+ * The properties of the epoch identifier are as follows
+ *    1. Unique across applications in the cluster
+ *    2. Remains unchanged within a single deployment lifecycle
+ *    3. Remains unchanged across application attempt within a single 
deployment lifecycle
+ *    4. Changes across deployment lifecycle
+ *
+ * Config ID - An identifier associated with a subset of configuration 
snapshot used by the job in an application attempt.
+ * Current prefixes that impacts the identifier are job.autosizing.*
+ * The properties of the config identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the subset 
of configuration remains unchanged.
+ *
+ * Job Model ID - An identifier associated with the JobModel used by the job 
in an application attempt. JobModel
+ * has both configurations and list of container model. We don't account for 
changes in the configuration as part of this
+ * identifier since it is separately tracked and handled by Config ID.
+ * The properties of the job model identifier are as follows
+ *    1. Reproducible and deterministic
+ *    2. Remains unchanged across application attempts as long as the work 
assignment remains unchanged
+ *
+ * Notes on interface stability - It is used internally by Samza for job 
coordinator high availability in YARN
+ * deployment offering. It may evolve depending on expanding the scope beyond 
YARN and hence unstable.
+ *
+ */
[email protected]
+public class JobCoordinatorMetadata {
+  private final String configId;
+  private final String epochId;
+  private final String jobModelId;
+
+  public JobCoordinatorMetadata(String epochId, String configId, String 
jobModelId) {
+    this.configId = configId;
+    this.epochId = epochId;
+    this.jobModelId = jobModelId;
+  }
+
+  public String getConfigId() {
+    return configId;
+  }
+
+  public String getEpochId() {
+    return this.epochId;
+  }
+
+  public String getJobModelId() {
+    return jobModelId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JobCoordinatorMetadata)) {
+      return false;
+    }
+    JobCoordinatorMetadata metadata = (JobCoordinatorMetadata) o;
+    return Objects.equals(configId, metadata.configId) && 
Objects.equals(epochId, metadata.epochId)

Review comment:
       when the Ids are Strings why not use String.equals? any specific reason 
for choosing Objects.equals?

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import 
org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage;
+import org.apache.samza.job.JobCoordinatorMetadata;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class to manage read and writes of {@link JobCoordinatorMetadata} to 
{@link MetadataStore}. It also provides
+ * additional helper functionalities to generate {@link 
JobCoordinatorMetadata} and check for changes across runs.
+ */
+public class JobCoordinatorMetadataManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobCoordinatorMetadataManager.class);
+  private static final String APPLICATION_ATTEMPT_COUNT = 
"applicationAttemptCount";
+  private static final String JOB_COORDINATOR_MANAGER_METRICS = 
"job-coordinator-manager";
+  private static final String JOB_MODEL_CHANGED = "jobModelChanged";
+  private static final String CONFIG_CHANGED = "configChanged";
+  private static final String NEW_DEPLOYMENT = "newDeployment";
+
+  static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID";
+  static final String CONTAINER_ID_DELIMITER = "_";
+
+  private final Counter applicationAttemptCount;
+  private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> configChangedAcrossApplicationAttempt;
+  private final Gauge<Integer> newDeployment;
+  private final MetadataStore metadataStore;
+  private final ObjectMapper metadataMapper = 
SamzaObjectMapper.getObjectMapper();
+  private final Serde<String> valueSerde;
+  private final String clusterType;
+
+  public JobCoordinatorMetadataManager(MetadataStore metadataStore, String 
clusterType, MetricsRegistry metricsRegistry) {
+    Preconditions.checkState(StringUtils.isNotBlank(clusterType), "Cluster 
type cannot be empty");
+    this.clusterType = clusterType;
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE);
+
+    applicationAttemptCount = 
metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, 
APPLICATION_ATTEMPT_COUNT);
+    configChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
CONFIG_CHANGED, 0);
+    jobModelChangedAcrossApplicationAttempt =
+        metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
JOB_MODEL_CHANGED, 0);
+    newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, 
NEW_DEPLOYMENT, 0);
+  }
+
+  /**
+   * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}.
+   *
+   * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. 
Refer to the javadocs for more
+   * details on how it is generated and the properties of the identifier.
+   *
+   * Config ID - A unique and reproducible identifier that is generated based 
on the input {@link Config}. It uses
+   * a {@link Funnel} to use a subset of the input configuration to generate 
the identifier and as long as the subset
+   * of the configuration remains same, the identifier is guaranteed to be 
same. For the list of config prefixes used
+   * by the funnel refer to {@link ConfigHashFunnel}
+   *
+   * JobModel ID - A unique and reproducible identifier that is generated 
based on the input {@link JobModel}. It only
+   * uses the {@link org.apache.samza.job.model.ContainerModel} within the 
{@linkplain JobModel} for generation. We
+   * serialize the data into bytes and use those bytes to compute the 
identifier.
+   *
+   * In case of YARN, the epoch identifier is extracted from the application 
attempt and translates to applicationId
+   * e.g. 1606797336059_0010
+   * Both config and job model identifiers should a 32 bit integer.
+   *
+   * @param jobModel job model used for generating the metadata
+   * @param config config used for generating the metadata
+   *
+   * @return the metadata for the job coordinator
+   */
+  public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel 
jobModel, Config config) {
+    try {
+      int jobModelId = Hashing
+          .crc32c()
+          
.hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers()))
+          .asInt();
+      int configId = Hashing
+          .crc32()
+          .hashObject(config, new ConfigHashFunnel())
+          .asInt();
+
+      LOG.info("Generated job model id {} and config id {}", jobModelId, 
configId);
+      return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), 
String.valueOf(configId),
+          String.valueOf(jobModelId));
+    } catch (Exception e) {
+      LOG.error("Failed to generate metadata for the current attempt due to ", 
e);
+      throw new RuntimeException("Failed to generate the metadata for the 
current attempt due to ", e);
+    }
+  }
+
+  /**
+   * Check for changes between the metadata passed as inputs. Metadata is 
considered changed if any of the attributes within
+   * {@linkplain JobCoordinatorMetadata} changes.
+   *
+   * We intentionally check for each changes to help us track at this 
granularity. We want to use this information
+   * to determine if complex handling is required to cater these changes 
instead of blindly restarting all the
+   * containers upstream.
+   *
+   * @param newMetadata new metadata to be compared
+   * @param previousMetadata previous metadata to be compared against
+   *
+   * @return true if metadata changed, false otherwise
+   */
+  public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, 
JobCoordinatorMetadata previousMetadata) {
+    boolean changed = true;
+
+    if (previousMetadata == null) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+      newDeployment.set(1);
+    } else if 
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+      jobModelChangedAcrossApplicationAttempt.set(1);
+    } else if 
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+      configChangedAcrossApplicationAttempt.set(1);
+    } else {
+      changed = false;
+      applicationAttemptCount.inc();
+    }
+
+    if (changed) {
+      LOG.info("Job coordinator metadata changed from: {} to: {}", 
previousMetadata, newMetadata);
+    } else {
+      LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+    }
+
+    return changed;
+  }
+
+  /**
+   * Reads the {@link JobCoordinatorMetadata} from the metadata store. It 
fetches the metadata
+   * associated with cluster type specified at the creation of the manager.
+   *
+   * @return job coordinator metadata
+   */
+  public JobCoordinatorMetadata readJobCoordinatorMetadata() {
+    JobCoordinatorMetadata metadata = null;
+    for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) {
+      if (clusterType.equals(entry.getKey())) {
+        try {
+          String metadataString = valueSerde.fromBytes(entry.getValue());
+          metadata = metadataMapper.readValue(metadataString, 
JobCoordinatorMetadata.class);
+          break;
+        } catch (Exception e) {
+          LOG.error("Failed to read job coordinator metadata due to ", e);
+        }
+      }
+    }
+
+    LOG.info("Fetched the job coordinator metadata for cluster {} as {}.", 
clusterType, metadata);
+    return metadata;
+  }
+
+  /**
+   * Persist the {@link JobCoordinatorMetadata} in metadata store. The job 
coordinator metadata is associated
+   * with the cluster type specified at the creation of the manager.
+   *
+   * @param metadata metadata to be persisted
+   *
+   * @return true if the write succeeded, false otherwise
+   */
+  public boolean writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
+    Preconditions.checkNotNull(metadata, "Job coordinator metadata cannot be 
null");
+
+    boolean writeSucceeded = false;
+    try {
+      String metadataValueString = metadataMapper.writeValueAsString(metadata);
+      metadataStore.put(clusterType, valueSerde.toBytes(metadataValueString));
+      writeSucceeded = true;
+    } catch (Exception e) {
+      LOG.error("Failed to write the job coordinator metadata to metadata 
store due to ", e);
+    }
+
+    LOG.info("Successfully written job coordinator metadata: {} for cluster 
{}.", metadata, clusterType);
+    return writeSucceeded;
+  }
+
+  @VisibleForTesting
+  Counter getApplicationAttemptCount() {
+    return applicationAttemptCount;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getJobModelChangedAcrossApplicationAttempt() {
+    return jobModelChangedAcrossApplicationAttempt;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getConfigChangedAcrossApplicationAttempt() {
+    return configChangedAcrossApplicationAttempt;
+  }
+
+  @VisibleForTesting
+  Gauge<Integer> getNewDeployment() {
+    return newDeployment;
+  }
+
+  @VisibleForTesting
+  String getEnvProperty(String propertyName) {
+    return System.getenv(propertyName);
+  }
+
+  /**
+   * Generate the epoch id using the execution container id that is passed 
through system environment. This isn't ideal
+   * way of generating this ID and we will need some contract between the 
underlying cluster manager and samza engine
+   * around what the epoch ID should be like and what is needed to generate is 
across different cluster offerings.
+   * Due to unknowns defined above, we leave it as is and keep it simple for 
now. It is favorable to keep it this way
+   * instead of introducing a loosely defined interface/API and marking it 
unstable.
+   *
+   * The properties of the epoch identifier are as follows
+   *  1. Unique across applications in the cluster
+   *  2. Remains unchanged within a single deployment lifecycle
+   *  3. Remains unchanged across application attempt within a single 
deployment lifecycle
+   *  4. Changes across deployment lifecycle
+   *
+   *  Note: The above properties is something we want keep intact when 
extracting this into a well defined interface
+   *  or contract for YARN AM HA to work.
+   *
+   * @return an identifier associated with the job coordinator satisfying the 
above properties
+   */
+  private String fetchEpochIdForJobCoordinator() {
+    String[] containerIdParts = 
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);

Review comment:
       just clarifying - this is very yarn specific right. should we call out 
this is ONLY for YARN at the moment?
   and even within Yarn - it is very specific to the way yarn names 
containers/app attempts right now -- i dont forsee yarn changing this due to 
backwards compatibility but wanted to sure we are aware of this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to