cameronlee314 commented on a change in pull request #1519:
URL: https://github.com/apache/samza/pull/1519#discussion_r691533650
##########
File path:
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -134,29 +135,34 @@ public JobCoordinatorMetadata
generateJobCoordinatorMetadata(JobModel jobModel,
*
* @return true if metadata changed, false otherwise
*/
- public boolean checkForMetadataChanges(JobCoordinatorMetadata newMetadata,
JobCoordinatorMetadata previousMetadata) {
- boolean changed = true;
-
- if (previousMetadata == null) {
- metrics.setNewDeployment(1);
- } else if
(!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
+ public Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata
newMetadata,
+ JobCoordinatorMetadata previousMetadata) {
+ Set<JobMetadataChange> changes = new HashSet<>();
+ boolean newDeployment = false;
+ if (previousMetadata == null ||
!previousMetadata.getEpochId().equals(newMetadata.getEpochId())) {
metrics.setNewDeployment(1);
- } else if
(!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
- metrics.setJobModelChangedAcrossApplicationAttempt(1);
- } else if
(!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
- metrics.setConfigChangedAcrossApplicationAttempt(1);
- } else {
- changed = false;
- metrics.incrementApplicationAttemptCount();
+ newDeployment = true;
+ changes.add(JobMetadataChange.NEW_DEPLOYMENT);
}
-
- if (changed) {
- LOG.info("Job coordinator metadata changed from: {} to: {}",
previousMetadata, newMetadata);
- } else {
+ if (previousMetadata == null ||
!previousMetadata.getJobModelId().equals(newMetadata.getJobModelId())) {
+ if (!newDeployment) {
+ metrics.setJobModelChangedAcrossApplicationAttempt(1);
+ }
+ changes.add(JobMetadataChange.JOB_MODEL);
+ }
+ if (previousMetadata == null ||
!previousMetadata.getConfigId().equals(newMetadata.getConfigId())) {
+ if (!newDeployment) {
+ metrics.setConfigChangedAcrossApplicationAttempt(1);
+ }
+ changes.add(JobMetadataChange.CONFIG);
+ }
+ if (changes.isEmpty()) {
LOG.info("Job coordinator metadata {} unchanged.", newMetadata);
+ metrics.incrementApplicationAttemptCount();
+ } else {
+ LOG.info("Job coordinator metadata changed from: {} to: {}",
previousMetadata, newMetadata);
Review comment:
IIRC, I did have that early termination check initially, but I liked how
the current way has only a single explicit condition for each different type of
change. I was considering the case where someone adds a new
`JobMetadataChange`: they will be forced to add a new condition, which will
help ensure it gets handled properly. If we use early termination, then a new
enum might get automatically lumped in, and that might be undesirable.
I don't think early termination eliminates the check for new deployment, due
to the semantics of the metrics. `previousMetadata == null` doesn't cover the
case of epoch id changing, but an epoch id change doesn't necessarily always
mean a job model change.
##########
File path:
samza-core/src/main/java/org/apache/samza/job/metadata/JobCoordinatorMetadataManager.java
##########
@@ -207,29 +213,33 @@ public void
writeJobCoordinatorMetadata(JobCoordinatorMetadata metadata) {
}
/**
- * Generate the epoch id using the execution container id that is passed
through system environment. This isn't ideal
- * way of generating this ID and we will need some contract between the
underlying cluster manager and samza engine
- * around what the epoch ID should be like and what is needed to generate is
across different cluster offerings.
- * Due to unknowns defined above, we leave it as is and keep it simple for
now. It is favorable to keep it this way
- * instead of introducing a loosely defined interface/API and marking it
unstable.
- *
* The properties of the epoch identifier are as follows
* 1. Unique across applications in the cluster
* 2. Remains unchanged within a single deployment lifecycle
* 3. Remains unchanged across application attempt within a single
deployment lifecycle
* 4. Changes across deployment lifecycle
*
- * Note: The above properties is something we want keep intact when
extracting this into a well defined interface
- * or contract for YARN AM HA to work.
- * The format and property used to generate ID is specific to YARN and the
specific format of the container name
- * is a public contract by YARN which is likely to remain backward
compatible.
+ * For YARN environment:
+ * Generate the epoch id using the execution container id that is passed
through system environment. This isn't ideal
+ * way of generating this ID, since it is YARN-specific. This is left as a
legacy flow for backwards compatibility, as
+ * the original implementation did not define a cluster-agnostic contract.
+ * The format and property used to generate ID is specific to YARN and the
specific format of the container name
+ * is a public contract by YARN which is likely to remain backward
compatible.
+ *
+ * For non-YARN environments:
+ * Extract this from the environment variable SAMZA_EPOCH_ID. This is a more
generic way of obtaining an epoch id, but
+ * it does require the resource management layer to inject this value.
*
* @return an identifier associated with the job coordinator satisfying the
above properties
*/
@VisibleForTesting
String fetchEpochIdForJobCoordinator() {
- String[] containerIdParts =
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
- return containerIdParts[1] + CONTAINER_ID_DELIMITER + containerIdParts[2];
+ if (ClusterType.YARN.equals(this.clusterType)) {
+ String[] containerIdParts =
getEnvProperty(CONTAINER_ID_PROPERTY).split(CONTAINER_ID_DELIMITER);
+ return containerIdParts[1] + CONTAINER_ID_DELIMITER +
containerIdParts[2];
+ } else {
+ return getEnvProperty("SAMZA_EPOCH_ID");
Review comment:
Good point, I'll improve docs on this.
##########
File path:
samza-core/src/main/java/org/apache/samza/job/metadata/JobMetadataChange.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.metadata;
+
+/**
+ * Provides granularity into changes in job metadata (e.g. new job model, new
config).
+ */
+public enum JobMetadataChange {
Review comment:
To me, in the current state of the code, this doesn't seem like an
actual API at this point. Implementations of coordinators use this object, but
if you use that categorization for "API", then you also need to include any of
the other classes that could be used by a coordinator (e.g.
`StreamMetadataCache`, `StreamPartitionCountMonitor`,
`JobCoordinatorMetadataManager`, etc.). Therefore, just moving a couple classes
doesn't really do much.
I actually do think there is a lot of opportunity for refactoring the
classes into modules such that these components do go into a more explicit
"API" module, but that is a much bigger scope than this PR.
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -368,8 +368,8 @@ void generateAndUpdateJobCoordinatorMetadata(JobModel
jobModel) {
JobCoordinatorMetadataManager jobCoordinatorMetadataManager =
createJobCoordinatorMetadataManager();
JobCoordinatorMetadata previousMetadata =
jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
- JobCoordinatorMetadata newMetadata =
jobCoordinatorMetadataManager.generateJobCoordinatorMetadata(jobModel, config);
Review comment:
Oh, that's pretty misleading that config isn't always fully filled in,
and it's worse because now `JobModel` is part of the API.
I thought I was just doing a minor clean-up, so this doesn't need to be
changed.
Regarding your note about `JobModel` being a container for config:
`JobModel` is the single object which gets coordinated across containers, and
it still feels like config might be something that should be coordinated
between containers, so I'm not totally convinced that config should be pulled
out of `JobModel`. Anyways, that is out-of-scope for this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]