lakshmi-manasa-g commented on a change in pull request #1449: URL: https://github.com/apache/samza/pull/1449#discussion_r538630997
########## File path: samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorMetadataManager.java ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.hash.Funnel; +import com.google.common.hash.Hashing; +import com.google.common.hash.PrimitiveSink; +import java.util.Map; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; +import org.apache.samza.coordinator.stream.messages.SetJobCoordinatorMetadataMessage; +import org.apache.samza.job.JobCoordinatorMetadata; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.metadatastore.MetadataStore; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class to manage read and writes of {@link JobCoordinatorMetadata} to {@link MetadataStore}. It also provides + * additional helper functionalities to generate {@link JobCoordinatorMetadata} and check for changes across runs. + */ +public class JobCoordinatorMetadataManager { + private static final Logger LOG = LoggerFactory.getLogger(JobCoordinatorMetadataManager.class); + private static final String APPLICATION_ATTEMPT_COUNT = "applicationAttemptCount"; + private static final String JOB_COORDINATOR_MANAGER_METRICS = "job-coordinator-manager"; + private static final String JOB_MODEL_CHANGED = "jobModelChanged"; + private static final String CONFIG_CHANGED = "configChanged"; + private static final String NEW_DEPLOYMENT = "newDeployment"; + + static final String CONTAINER_ID_PROPERTY = "CONTAINER_ID"; + static final String CONTAINER_ID_DELIMITER = "_"; + + private final Counter applicationAttemptCount; + private final Gauge<Integer> jobModelChangedAcrossApplicationAttempt; + private final Gauge<Integer> configChangedAcrossApplicationAttempt; + private final Gauge<Integer> newDeployment; + private final MetadataStore metadataStore; + private final ObjectMapper metadataMapper = SamzaObjectMapper.getObjectMapper(); + private final Serde<String> valueSerde; + private final ClusterType clusterType; + + public JobCoordinatorMetadataManager(MetadataStore metadataStore, ClusterType clusterType, MetricsRegistry metricsRegistry) { + Preconditions.checkNotNull(clusterType, "Cluster type cannot be null"); + this.clusterType = clusterType; + this.metadataStore = metadataStore; + this.valueSerde = new CoordinatorStreamValueSerde(SetJobCoordinatorMetadataMessage.TYPE); + + applicationAttemptCount = metricsRegistry.newCounter(JOB_COORDINATOR_MANAGER_METRICS, APPLICATION_ATTEMPT_COUNT); + configChangedAcrossApplicationAttempt = + metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, CONFIG_CHANGED, 0); + jobModelChangedAcrossApplicationAttempt = + metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, JOB_MODEL_CHANGED, 0); + newDeployment = metricsRegistry.newGauge(JOB_COORDINATOR_MANAGER_METRICS, NEW_DEPLOYMENT, 0); + } + + /** + * Generates {@link JobCoordinatorMetadata} for the {@link JobCoordinator}. + * + * Epoch ID - It is generated by {@link #fetchEpochIdForJobCoordinator()}. Refer to the javadocs for more + * details on how it is generated and the properties of the identifier. + * + * Config ID - A unique and reproducible identifier that is generated based on the input {@link Config}. It uses + * a {@link Funnel} to use a subset of the input configuration to generate the identifier and as long as the subset + * of the configuration remains same, the identifier is guaranteed to be same. For the list of config prefixes used + * by the funnel refer to {@link ConfigHashFunnel} + * + * JobModel ID - A unique and reproducible identifier that is generated based on the input {@link JobModel}. It only + * uses the {@link org.apache.samza.job.model.ContainerModel} within the {@linkplain JobModel} for generation. We + * serialize the data into bytes and use those bytes to compute the identifier. + * + * In case of YARN, the epoch identifier is extracted from the application attempt and translates to applicationId + * e.g. 1606797336059_0010 + * Both config and job model identifiers should a 32 bit integer. + * + * @param jobModel job model used for generating the metadata + * @param config config used for generating the metadata + * + * @return the metadata for the job coordinator + */ + public JobCoordinatorMetadata generateJobCoordinatorMetadata(JobModel jobModel, Config config) { + try { + int jobModelId = Hashing + .crc32c() + .hashBytes(SamzaObjectMapper.getObjectMapper().writeValueAsBytes(jobModel.getContainers())) + .asInt(); + int configId = Hashing + .crc32() + .hashObject(config, new ConfigHashFunnel()) + .asInt(); + + LOG.info("Generated job model id {} and config id {}", jobModelId, configId); + return new JobCoordinatorMetadata(fetchEpochIdForJobCoordinator(), String.valueOf(configId), + String.valueOf(jobModelId)); + } catch (Exception e) { + LOG.error("Failed to generate metadata for the current attempt due to ", e); + throw new RuntimeException("Failed to generate the metadata for the current attempt due to ", e); + } + } + + /** + * Check for changes between the metadata passed as inputs. Metadata is considered changed if any of the attributes within + * {@linkplain JobCoordinatorMetadata} changes. + * + * We intentionally check for each changes to help us track at this granularity. We want to use this information + * to determine if complex handling is required to cater these changes instead of blindly restarting all the + * containers upstream. + * + * @param newMetadata new metadata to be compared + * @param previousMetadata previous metadata to be compared against + * + * @return true if metadata changed, false otherwise + */ + public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata, JobCoordinatorMetadata previousMetadata) { + boolean changed = true; + + if (previousMetadata == null) { + newDeployment.set(1); + } else if (!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) { + newDeployment.set(1); + } else if (!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) { + jobModelChangedAcrossApplicationAttempt.set(1); + } else if (!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) { + configChangedAcrossApplicationAttempt.set(1); + } else { + changed = false; + applicationAttemptCount.inc(); + } + + if (changed) { + LOG.info("Job coordinator metadata changed from: {} to: {}", previousMetadata, newMetadata); + } else { + LOG.info("Job coordinator metadata {} unchanged.", newMetadata); + } + + return changed; + } + + /** + * Reads the {@link JobCoordinatorMetadata} from the metadata store. It fetches the metadata + * associated with cluster type specified at the creation of the manager. + * + * @return job coordinator metadata + */ + public JobCoordinatorMetadata readJobCoordinatorMetadata() { + JobCoordinatorMetadata metadata = null; + for (Map.Entry<String, byte[]> entry : metadataStore.all().entrySet()) { + if (clusterType.name().equals(entry.getKey())) { + try { + String metadataString = valueSerde.fromBytes(entry.getValue()); + metadata = metadataMapper.readValue(metadataString, JobCoordinatorMetadata.class); + break; + } catch (Exception e) { + LOG.error("Failed to read job coordinator metadata due to ", e); Review comment: same here.. maybe we should not swallow the exception here also. This is crucial piece of AM HA and also behind the config for it i guess.. so we can just fail JC here. if we dont fail JC then what is the behavior when it couldnt read metadata? how does it determine what actions to take? are we planning to blanket restart all containers in this case? what about updating the metrics? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
