mynameborat commented on a change in pull request #1519:
URL: https://github.com/apache/samza/pull/1519#discussion_r690861039
##########
File path:
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -207,29 +213,33 @@ public void
writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
}
/**
- * Generate the epoch id using the execution container id that is passed
through system environment. This isn't ideal
- * way of generating this ID and we will need some contract between the
underlying cluster manager and samza engine
- * around what the epoch ID should be like and what is needed to generate is
across different cluster offerings.
- * Due to unknowns defined above, we leave it as is and keep it simple for
now. It is favorable to keep it this way
- * instead of introducing a loosely defined interface/API and marking it
unstable.
- *
* The properties of the epoch identifier are as follows
* 1. Unique across applications in the cluster
* 2. Remains unchanged within a single deployment lifecycle
* 3. Remains unchanged across application attempt within a single
deployment lifecycle
* 4. Changes across deployment lifecycle
*
- * Note: The above properties is something we want keep intact when
extracting this into a well defined interface
- * or contract for YARN AM HA to work.
- * The format and property used to generate ID is specific to YARN and the
specific format of the container name
- * is a public contract by YARN which is likely to remain backward
compatible.
+ * For YARN environment:
+ * Generate the epoch id using the execution container id that is passed
through system environment. This isn't ideal
+ * way of generating this ID, since it is YARN-specific. This is left as a
legacy flow for backwards compatibility, as
+ * the original implementation did not define a cluster-agnostic contract.
+ * The format and property used to generate ID is specific to YARN and the
specific format of the container name
+ * is a public contract by YARN which is likely to remain backward
compatible.
+ *
+ * For non-YARN environments:
+ * Extract this from the environment variable SAMZA_EPOCH_ID. This is a more
generic way of obtaining an epoch id, but
+ * it does require the resource management layer to inject this value.
*
* @return an identifier associated with the job coordinator satisfying the
above properties
*/
@VisibleForTesting
String fetchEpochIdForJobCoordinator() {
- String[] containerIdParts =
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
- return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+ if (ClusterType.YARN.equals(this.clusterType)) {
+ String[] containerIdParts =
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+ return containerIdParts[1] + CONTAINER_ID_DELIMITER +
containerIdParts[2];
+ } else {
+ return getEnvProperty("SAMZA_EPOCH_ID");
Review comment:
Is it fair to treat as this a configuration that operators of samza if
not the end users should be aware of? If so, can we document what is the
expected properties, format/protocol of the value?
Unfortunately, we don't have a central place for some of the environment
variables we pass to our system and i am okay if you want to document it
alongside the configurations or if you find some other place that is
appropriate.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -368,8 +368,8 @@ void generateAndUpdateJobCoordinatorMetadata(JobModel
jobModel) {
JobCoordinatorMetadataManager jobCoordinatorMetadataManager =
createJobCoordinatorMetadataManager();
JobCoordinatorMetadata previousMetadata =
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
- JobCoordinatorMetadata newMetadata =
jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);
Review comment:
There are scenarios where the job model object doesn't have config
populated within. E.g., Standalone.
Hence the signature to explicitly pass around config. IMO, `JobModel` being
a container for configuration shouldn't be the ideal case and I had to keep
this around for back-compat reasons in the YARN (a single servlet to expose the
configuration + JobModel to workers).
That said, the current model constrains updates to one vs another and
forces to update the container object `JobModel` as a whole. In YARN, this
isn't a problem, because the lifecycle of both co-exist.
Do you have any strong reasons behind making this change? If not, I'd
prefer to keep it the way it is so that it allows us the flexibility to not
depend on the config within `JobModel` and potentially help in the path of
extracting it from `JobModel`.
##########
File path:
samza-core/src/main/java/org/apache/samza/job/metadata/JobMetadataChange.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.metadata;
+
+/**
+ * Provides granularity into changes in job metadata (e.g. new job model, new
config).
+ */
+public enum JobMetadataChange {
Review comment:
Can we move this under samza-api along with `JobCoordinatorMetadata`?
##########
File path:
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -134,29 +135,34 @@ public JobCoordinatorMetadata
generateJobCoordinatorMetadata(JobModel jobModel,
*
* @return true if metadata changed, false otherwise
*/
- public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata,
JobCoordinatorMetadata previousMetadata) {
- boolean changed = true;
-
- if (previousMetadata == null) {
- metrics.setNewDeployment(1);
- } else if
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+ public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata
newMetadata,
+ JobCoordinatorMetadata previousMetadata) {
+ Set<JobMetadataChange> changes = new HashSet<>();
+ boolean newDeployment = false;
+ if (previousMetadata == null ||
!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
metrics.setNewDeployment(1);
- } else if
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
- metrics.setJobModelChangedAcrossApplicationAttempt(1);
- } else if
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
- metrics.setConfigChangedAcrossApplicationAttempt(1);
- } else {
- changed = false;
- metrics.incrementApplicationAttemptCount();
+ newDeployment = true;
+ changes.add(JobMetadataChange.NEW_DEPLOYMENT);
}
-
- if (changed) {
- LOG.info("Job coordinator metadata changed from: {} to: {}",
previousMetadata, newMetadata);
- } else {
+ if (previousMetadata == null ||
!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+ if (!newDeployment) {
+ metrics.setJobModelChangedAcrossApplicationAttempt(1);
+ }
+ changes.add(JobMetadataChange.JOB_MODEL);
+ }
+ if (previousMetadata == null ||
!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+ if (!newDeployment) {
+ metrics.setConfigChangedAcrossApplicationAttempt(1);
+ }
+ changes.add(JobMetadataChange.CONFIG);
+ }
+ if (changes.isEmpty()) {
LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+ metrics.incrementApplicationAttemptCount();
+ } else {
+ LOG.info("Job coordinator metadata changed from: {} to: {}",
previousMetadata, newMetadata);
Review comment:
Thoughts on early termination on `previousMetadata` being `null`? I am
not fan of multiple return statements in general but it seems to help w/
readability in this case and eliminates to check for new deployment in every
branch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]