Will-Lo commented on code in PR #3886:
URL: https://github.com/apache/gobblin/pull/3886#discussion_r1509402234
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java:
##########
@@ -41,7 +41,7 @@ public class NestingExecOfProcessWorkUnitWorkflowImpl extends
AbstractNestingExe
.build();
private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofSeconds(999))
+ .setStartToCloseTimeout(Duration.ofMinutes(20))
Review Comment:
We probably want to make this configurable, add a TODO? Otherwise there can
be unforseen behavior
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -62,11 +61,14 @@
public class ProcessWorkUnitImpl implements ProcessWorkUnit {
private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100;
+ private static final String MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT =
ProcessWorkUnitImpl.class.getName() + ".maxSourcePathsToLogPerMultiWorkUnit";
+ private static final int DEFAULT_MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT
= 5;
+
@Override
public int processWorkUnit(WorkUnitClaimCheck wu) {
try (FileSystem fs = Help.loadFileSystemForce(wu)) {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
- log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(),
workUnits.size());
+ log.info("(M)WU [{}] - loaded; found {} workUnits", wu.getCorrelator(),
workUnits.size());
Review Comment:
Is it guaranteed to be multiworkunits here? I think that depends if the job
has enough workunits to pack?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpecForBorrowingPriorState.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/**
+ * Same as {@link WUProcessingSpec}, but for a "stand-alone" "Work
Fulfillment-only" workflow that leverages the
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s and {@link JobState}
previously persisted by another separate
+ * job execution. Accordingly we wish to adjust our {@link
EventSubmitterContext} to reflect aspects of that original job.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class WUProcessingSpecForBorrowingPriorState extends WUProcessingSpec {
+ @NonNull
+ private List<Tag<?>> tags = new ArrayList<>();
+ @NonNull private String metricsSuffix =
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
+
+ public WUProcessingSpecForBorrowingPriorState(URI fileSystemUri, String
workUnitsDir, EventSubmitterContext eventSubmitterContext) {
+ super(fileSystemUri, workUnitsDir, eventSubmitterContext);
+ }
+
+ @Override
+ public boolean isToDoJobLevelTiming() {
+ return true;
+ }
+
+ @Override
+ public @NonNull EventSubmitterContext getEventSubmitterContext() {
+ // NOTE: We are using the metrics tags from Job Props to create the metric
context for the timer and NOT
+ // the deserialized jobState from HDFS that is created by the real distcp
job. This is because the AZ runtime
+ // settings we want are for the job launcher that launched this Yarn job.
+ try {
+ FileSystem fs = Help.loadFileSystemForce(this);
+ JobState jobState = Help.loadJobStateUncached(this, fs);
+ List<Tag<?>> tagsFromCurrentJob = this.getTags();
+ String metricsSuffix = this.getMetricsSuffix();
+ List<Tag<?>> tags = this.calcMergedTags(tagsFromCurrentJob,
metricsSuffix, jobState);
+ return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private List<Tag<?>> calcMergedTags(List<Tag<?>> tagsFromCurJob, String
metricsSuffix, JobState jobStateFromHdfs) {
+ // Construct new tags list by combining subset of tags on HDFS job state
and the rest of the fields from the current job
+ Map<String, Tag<?>> tagsMap = new HashMap<>();
+ Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+ TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+ TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+ // Step 1, Add tags from the AZ props using the original job (the one that
launched this yarn app)
+ tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+ // Step 2. Add tags from the jobState (the original MR job on HDFS)
+ List<String> targetKeysToAddSuffix =
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+ .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+ .forEach(tag -> {
+ // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND
FLOW_GROUP_FIELDS to prevent collisions when testing
+ String value = targetKeysToAddSuffix.contains(tag.getKey())
+ ? tag.getValue() + metricsSuffix
+ : String.valueOf(tag.getValue());
+ tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+ });
+
+ // Step 3: Overwrite any pre-existing metadata with name of the current
caller
+ tagsMap.put(GobblinMetricsKeys.CLASS_META, new
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+ return new ArrayList<>(tagsMap.values());
+ }
+}
Review Comment:
Does this make sense to move into `Help` or some other Util class rather
than tying it into the WUProcessing spec? It seems generic enough and useful
across classes.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpecForBorrowingPriorState.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/**
+ * Same as {@link WUProcessingSpec}, but for a "stand-alone" "Work
Fulfillment-only" workflow that leverages the
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s and {@link JobState}
previously persisted by another separate
+ * job execution. Accordingly we wish to adjust our {@link
EventSubmitterContext} to reflect aspects of that original job.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class WUProcessingSpecForBorrowingPriorState extends WUProcessingSpec {
Review Comment:
When would we want to use this type of workflow instead of an E2E workflow?
I think we can shorten the naming here to just be
`PriorStateWUProcessingSpec`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]