This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fee5a0e74 [GOBBLIN-2010] Implement Distributed Data Movement (DDM)
Gobblin-on-Temporal end-to-end workflow for arbitrary job config (#3886)
fee5a0e74 is described below
commit fee5a0e747812b7676f0be1fef357fbc9d3e0502
Author: Kip Kohn <[email protected]>
AuthorDate: Fri Mar 1 17:00:53 2024 -0800
[GOBBLIN-2010] Implement Distributed Data Movement (DDM)
Gobblin-on-Temporal end-to-end workflow for arbitrary job config (#3886)
Implement Distributed Data Movement (DDM) Gobblin-on-Temporal end-to-end
workflow for any arbitrary job config
---
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 5 -
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 65 ++++++------
.../ddm/launcher/ExecuteGobblinJobLauncher.java | 96 +++++++++++++++++
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 15 ++-
.../gobblin/temporal/ddm/util/JobStateUtils.java | 30 +++++-
.../ddm/work/PriorJobStateWUProcessingSpec.java | 117 +++++++++++++++++++++
.../temporal/ddm/work/WUProcessingSpec.java | 21 ++--
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 4 +-
.../workflow/ExecuteGobblinWorkflow.java} | 29 +++--
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 111 +++++++++++++++++++
.../NestingExecOfProcessWorkUnitWorkflowImpl.java | 3 +-
.../impl/ProcessWorkUnitsWorkflowImpl.java | 89 +++-------------
.../workflows/metrics/SubmitGTEActivityImpl.java | 17 ++-
13 files changed, 460 insertions(+), 142 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 797ba7951..2cb95a5d8 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -19,12 +19,10 @@ package org.apache.gobblin.temporal.ddm.activity.impl;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import java.util.Properties;
import com.google.api.client.util.Lists;
import com.google.common.io.Closer;
-import com.typesafe.config.Config;
import io.temporal.failure.ApplicationFailure;
@@ -43,7 +41,6 @@ import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -59,8 +56,6 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
// TODO: provide for job cancellation (unless handling at the
temporal-level of parent workflows)!
JobState jobState = new JobState(jobProps);
log.info("Created jobState: {}", jobState.toJsonString(true));
- Optional<Config> thisClassConfig = WorkerConfig.of(this);
- log.info("Obtained class config: {}", thisClassConfig.isPresent() ?
thisClassConfig.get() : "NO WORKER CONFIG: ERROR!");
Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
log.info("Using work dir root path for job '{}' - '{}'",
jobState.getJobId(), workDirRoot);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index 640d78da1..1aebf4318 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -44,7 +44,6 @@ import org.apache.gobblin.runtime.AbstractTaskStateTracker;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.Task;
-import org.apache.gobblin.runtime.TaskCreationException;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
@@ -62,11 +61,14 @@ import org.apache.gobblin.util.JobLauncherUtils;
public class ProcessWorkUnitImpl implements ProcessWorkUnit {
private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100;
+ private static final String MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT =
ProcessWorkUnitImpl.class.getName() + ".maxSourcePathsToLogPerMultiWorkUnit";
+ private static final int DEFAULT_MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT
= 5;
+
@Override
public int processWorkUnit(WorkUnitClaimCheck wu) {
try (FileSystem fs = Help.loadFileSystemForce(wu)) {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
- log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(),
workUnits.size());
+ log.info("(M)WU [{}] - loaded; found {} workUnits", wu.getCorrelator(),
workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
return execute(workUnits, wu, jobState, fs);
} catch (IOException | InterruptedException e) {
@@ -99,32 +101,21 @@ public class ProcessWorkUnitImpl implements
ProcessWorkUnit {
troubleshooter.start();
List<String> fileSourcePaths = workUnits.stream()
- .map(workUnit -> describeAsCopyableFile(workUnit,
wu.getWorkUnitPath()))
+ .map(workUnit -> getCopyableFileSourcePathDesc(workUnit,
wu.getWorkUnitPath()))
.collect(Collectors.toList());
- log.info("WU [{}] - submitting {} workUnits for copying files: {}",
wu.getCorrelator(),
- workUnits.size(), fileSourcePaths);
+ List<String> pathsToLog = getSourcePathsToLog(fileSourcePaths, jobState);
+ log.info("WU [{}] - submitting {} workUnits for copying source files:
{}{}",
+ wu.getCorrelator(),
+ workUnits.size(),
+ pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "),
+ pathsToLog);
log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(),
workUnits.get(0).toJsonString());
- try {
- GobblinMultiTaskAttempt taskAttempt =
GobblinMultiTaskAttempt.runWorkUnits(
- jobState.getJobId(), containerId, jobState, workUnits,
- taskStateTracker, taskExecutor, taskStateStore,
multiTaskAttemptCommitPolicy,
- resourcesBroker, troubleshooter.getIssueRepository(),
createInterruptionPredicate(fs, jobState));
- return taskAttempt.getNumTasksCreated();
- } catch (TaskCreationException tce) { // derived type of `IOException`
that ought not be caught!
- throw tce;
- } catch (IOException ioe) {
- // presume execution already occurred, with `TaskState` written to
reflect outcome
- log.warn("WU [" + wu.getCorrelator() + "] - continuing on despite
IOException:", ioe);
- return 0;
- }
- }
-
- /** Demonstration processing, to isolate debugging of WU loading and
deserialization */
- protected int countSumProperties(List<WorkUnit> workUnits,
WorkUnitClaimCheck wu) {
- int totalNumProps = workUnits.stream().mapToInt(workUnit ->
workUnit.getPropertyNames().size()).sum();
- log.info("opened WU [{}] to find {} properties total at '{}'",
wu.getCorrelator(), totalNumProps, wu.getWorkUnitPath());
- return totalNumProps;
+ GobblinMultiTaskAttempt taskAttempt = GobblinMultiTaskAttempt.runWorkUnits(
+ jobState.getJobId(), containerId, jobState, workUnits,
+ taskStateTracker, taskExecutor, taskStateStore,
multiTaskAttemptCommitPolicy,
+ resourcesBroker, troubleshooter.getIssueRepository(),
createInterruptionPredicate(fs, jobState));
+ return taskAttempt.getNumTasksCreated();
}
protected TaskStateTracker
createEssentializedTaskStateTracker(WorkUnitClaimCheck wu) {
@@ -157,7 +148,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit
{
};
}
- protected String describeAsCopyableFile(WorkUnit workUnit, String
workUnitPath) {
+ protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String
workUnitPath) {
return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
.map(copyableFile -> copyableFile.getOrigin().getPath().toString())
.orElse(
@@ -173,12 +164,6 @@ public class ProcessWorkUnitImpl implements
ProcessWorkUnit {
return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
}
- protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
- return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
- getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
- );
- }
-
protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
@@ -192,6 +177,12 @@ public class ProcessWorkUnitImpl implements
ProcessWorkUnit {
});
}
+ protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
+ return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
+ getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
+ );
+ }
+
protected Optional<Class<?>> getOptCopyEntityClass(State state, String
logDesc) {
try {
return Optional.of(CopySource.getCopyEntityClass(state));
@@ -225,4 +216,14 @@ public class ProcessWorkUnitImpl implements
ProcessWorkUnit {
return false;
}
}
+
+ private static List<String> getSourcePathsToLog(List<String> sourcePaths,
JobState jobState) {
+ int maxPathsToLog = getMaxSourcePathsToLogPerMultiWorkUnit(jobState);
+ int numPathsToLog = Math.min(sourcePaths.size(), maxPathsToLog);
+ return sourcePaths.subList(0, numPathsToLog);
+ }
+
+ private static int getMaxSourcePathsToLogPerMultiWorkUnit(State jobState) {
+ return jobState.getPropAsInt(MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT,
DEFAULT_MAX_SOURCE_PATHS_TO_LOG_PER_MULTI_WORK_UNIT);
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
new file mode 100644
index 000000000..7d4fcb295
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.launcher;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.client.WorkflowOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow
that executes a full Gobblin job workflow of:
+ * * Work Discovery (via an arbitrary and configurable {@link
org.apache.gobblin.source.Source})
+ * * Work Fulfillment/Processing
+ * * Commit
+ *
+ * see: {@link ExecuteGobblinWorkflow} *
+ *
+ * <p>
+ * This class is instantiated by the {@link
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job
submission to launch the Gobblin job.
+ * The actual task execution happens in the {@link
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Slf4j
+public class ExecuteGobblinJobLauncher extends GobblinTemporalJobLauncher {
+
+ public static final String WORKFLOW_ID_BASE = "ExecuteGobblin";
+
+ public ExecuteGobblinJobLauncher(
+ Properties jobProps,
+ Path appWorkDir,
+ List<? extends Tag<?>> metadataTags,
+ ConcurrentHashMap<String, Boolean> runningMap,
+ EventBus eventBus
+ ) throws Exception {
+ super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
+ }
+
+ @Override
+ public void submitJob(List<WorkUnit> workunits) {
+ try {
+ WorkflowOptions options = WorkflowOptions.newBuilder()
+ .setTaskQueue(this.queueName)
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps)))
+ .build();
+ ExecuteGobblinWorkflow workflow =
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
+
+ Config jobConfigWithOverrides =
applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));
+
+ Help.propagateGaaSFlowExecutionContext(this.jobProps);
+
+ EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext(this.eventSubmitter);
+
+ int numWorkUnits =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
+ log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 006572dc0..d943013e5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -22,14 +22,15 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.fs.Path;
-
import com.google.common.eventbus.EventBus;
import com.typesafe.config.ConfigFactory;
import io.temporal.client.WorkflowOptions;
+
import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.Tag;
@@ -37,11 +38,13 @@ import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.PriorJobStateWUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.PropertiesUtils;
import static
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
@@ -82,15 +85,14 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
URI nameNodeUri = new URI(PropertiesUtils.getRequiredProp(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI));
// NOTE: `Path` is challenging for temporal to ser/de, but nonetheless
do pre-construct as `Path`, to pre-validate this prop string's contents
Path workUnitsDir = new
Path(PropertiesUtils.getRequiredProp(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR));
- WUProcessingSpec wuSpec = new WUProcessingSpec(nameNodeUri,
workUnitsDir.toString());
+ EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext(this.eventSubmitter);
+ PriorJobStateWUProcessingSpec wuSpec = new
PriorJobStateWUProcessingSpec(nameNodeUri, workUnitsDir.toString(),
eventSubmitterContext);
if
(this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
}
- Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec,
Help.loadFileSystem(wuSpec)));
-
wuSpec.setTags(GobblinMetrics.getCustomTagsFromState(new
State(jobProps)));
wuSpec.setMetricsSuffix(this.jobProps.getProperty(
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
@@ -100,6 +102,9 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
.build();
+
+ Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec,
Help.loadFileSystem(wuSpec)));
+
ProcessWorkUnitsWorkflow workflow =
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
workflow.process(wuSpec);
} catch (Exception e) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index ae6a8b711..37d1f9047 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -73,8 +73,7 @@ public class JobStateUtils {
/** @return the {@link FileSystem} indicated by {@link
ConfigurationKeys#FS_URI_KEY} */
public static FileSystem openFileSystem(JobState jobState) throws
IOException {
- URI fsUri = URI.create(jobState.getProp(ConfigurationKeys.FS_URI_KEY,
ConfigurationKeys.LOCAL_FS_URI));
- return Help.loadFileSystemForUriForce(fsUri, jobState);
+ return Help.loadFileSystemForUriForce(getFileSystemUri(jobState),
jobState);
}
/** @return a new instance of {@link Source} identified by {@link
ConfigurationKeys#SOURCE_CLASS_KEY} */
@@ -104,10 +103,15 @@ public class JobStateUtils {
return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(),
TaskState.class);
}
+ /** @return the {@link URI} indicated by {@link
ConfigurationKeys#FS_URI_KEY} */
+ public static URI getFileSystemUri(JobState jobState) {
+ return URI.create(jobState.getProp(ConfigurationKeys.FS_URI_KEY,
ConfigurationKeys.LOCAL_FS_URI));
+ }
+
/**
* ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
- * @return "base" dir root path for work dir (parent of inputs, output task
states, etc.)
+ * @return "base" dir root {@link Path} for work dir (parent of inputs,
output task states, etc.)
*/
public static Path getWorkDirRoot(JobState jobState) {
return new Path(
@@ -118,7 +122,23 @@ public class JobStateUtils {
/**
* ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
- * @return path to {@link FsStateStore<TaskState>} backing dir
+ * @return {@link Path} where "input" {@link WorkUnit}s should reside
+ */
+ public static Path getWorkUnitsPath(JobState jobState) {
+ return getWorkUnitsPath(getWorkDirRoot(jobState));
+ }
+
+ /**
+ * @return {@link Path} where "input" {@link WorkUnit}s should reside
+ */
+ public static Path getWorkUnitsPath(Path workDirRoot) {
+ return new Path(workDirRoot, INPUT_DIR_NAME);
+ }
+
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return {@link Path} to {@link FsStateStore<TaskState>} backing dir
*/
public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
Path jobOutputPath = new Path(getWorkDirRoot(jobState), OUTPUT_DIR_NAME);
@@ -129,7 +149,7 @@ public class JobStateUtils {
public static void writeWorkUnits(List<WorkUnit> workUnits, Path
workDirRootPath, JobState jobState, FileSystem fs)
throws IOException {
String jobId = jobState.getJobId();
- Path targetDirPath = new Path(workDirRootPath, INPUT_DIR_NAME);
+ Path targetDirPath = getWorkUnitsPath(workDirRootPath);
int numThreads =
ParallelRunner.getNumThreadsConfig(jobState.getProperties());
Closer closer = Closer.create(); // (NOTE: try-with-resources syntax
wouldn't allow `catch { closer.rethrow(t) }`)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
new file mode 100644
index 000000000..98c375564
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.instrumented.GobblinMetricsKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/**
+ * Same as {@link WUProcessingSpec}, but for a "Work Fulfillment-only"
workflow that leverages the {@link JobState} and
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s previously persisted
by another separate job execution.
+ * Accordingly we wish to adjust/"spoof" our {@link EventSubmitterContext} to
carry identifiers from that original job,
+ * and to indicate that the processing workflow ought to perform job-level
timing.
+ */
+@Data
+@EqualsAndHashCode(callSuper = true) // to prevent findbugs warning - "equals
method overrides equals in superclass and may not be symmetric"
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class PriorJobStateWUProcessingSpec extends WUProcessingSpec {
+ @NonNull
+ private List<Tag<?>> tags = new ArrayList<>();
+ @NonNull private String metricsSuffix =
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
+
+ public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir,
EventSubmitterContext eventSubmitterContext) {
+ super(fileSystemUri, workUnitsDir, eventSubmitterContext);
+ }
+
+ @Override
+ public boolean isToDoJobLevelTiming() {
+ return true;
+ }
+
+ @Override
+ public @NonNull EventSubmitterContext getEventSubmitterContext() {
+ // NOTE: We are using the metrics tags from Job Props to create the metric
context for the timer and NOT
+ // the deserialized jobState from HDFS that is created by the real distcp
job. This is because the AZ runtime
+ // settings we want are for the job launcher that launched this Yarn job.
+ try {
+ FileSystem fs = Help.loadFileSystemForce(this);
+ JobState jobState = Help.loadJobStateUncached(this, fs);
+ List<Tag<?>> tagsFromCurrentJob = this.getTags();
+ String metricsSuffix = this.getMetricsSuffix();
+ List<Tag<?>> tags = this.calcMergedTags(tagsFromCurrentJob,
metricsSuffix, jobState);
+ return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private List<Tag<?>> calcMergedTags(List<Tag<?>> tagsFromCurJob, String
metricsSuffix, JobState jobStateFromHdfs) {
+ // Construct new tags list by combining subset of tags on HDFS job state
and the rest of the fields from the current job
+ Map<String, Tag<?>> tagsMap = new HashMap<>();
+ Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
+ TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
+ TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
+
+ // Step 1, Add tags from the AZ props using the original job (the one that
launched this yarn app)
+ tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
+
+ // Step 2. Add tags from the jobState (the original MR job on HDFS)
+ List<String> targetKeysToAddSuffix =
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
+ .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
+ .forEach(tag -> {
+ // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND
FLOW_GROUP_FIELDS to prevent collisions when testing
+ String value = targetKeysToAddSuffix.contains(tag.getKey())
+ ? tag.getValue() + metricsSuffix
+ : String.valueOf(tag.getValue());
+ tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
+ });
+
+ // Step 3: Overwrite any pre-existing metadata with name of the current
caller
+ tagsMap.put(GobblinMetricsKeys.CLASS_META, new
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
+ return new ArrayList<>(tagsMap.values());
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index 5e3ba3dd1..30e67eb34 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,42 +18,45 @@
package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Optional;
-import org.apache.hadoop.fs.Path;
-
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import org.apache.hadoop.fs.Path;
+
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
/**
* Intended to reference multiple {@link
org.apache.gobblin.source.workunit.WorkUnit}s to process, where `workUnitsDir`
- * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by
`nameNodeUri`. see:
+ * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by
`nameNodeUri`
*/
@Data
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY,
property = "@class") // to handle extensions
public class WUProcessingSpec implements FileSystemApt, FileSystemJobStateful {
@NonNull private URI fileSystemUri;
@NonNull private String workUnitsDir;
+ @NonNull private EventSubmitterContext eventSubmitterContext;
@NonNull private Tuning tuning = Tuning.DEFAULT;
- @NonNull private List<Tag<?>> tags = new ArrayList<>();
- @NonNull private String metricsSuffix =
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
+
+ /** whether to conduct job-level timing (and send results via GTE) */
+ public boolean isToDoJobLevelTiming() {
+ return false;
+ }
@JsonIgnore // (because no-arg method resembles 'java bean property')
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 02631719c..5256b8b2b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -28,6 +28,7 @@ import
org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
+import
org.apache.gobblin.temporal.ddm.workflow.impl.ExecuteGobblinWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
@@ -45,7 +46,8 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
@Override
protected Class<?>[] getWorkflowImplClasses() {
- return new Class[] { CommitStepWorkflowImpl.class,
GenerateWorkUnitsWorkflowImpl.class,
NestingExecOfProcessWorkUnitWorkflowImpl.class,
ProcessWorkUnitsWorkflowImpl.class };
+ return new Class[] { CommitStepWorkflowImpl.class,
ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class,
+ NestingExecOfProcessWorkUnitWorkflowImpl.class,
ProcessWorkUnitsWorkflowImpl.class };
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
similarity index 51%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
index 63bce421a..d764f3742 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
@@ -15,20 +15,27 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.workflows.metrics;
+package org.apache.gobblin.temporal.ddm.workflow;
-import org.slf4j.Logger;
+import java.util.Properties;
-import io.temporal.workflow.Workflow;
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
-import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
-public class SubmitGTEActivityImpl implements SubmitGTEActivity {
- private static Logger log =
Workflow.getLogger(SubmitGTEActivityImpl.class);
-
- @Override
- public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
- eventSubmitterContext.create().submit(eventBuilder);
- }
+/**
+ * Workflow for executing an end-to-end Gobblin job, including:
+ * * Work Discovery (via an arbitrary and configurable {@link
org.apache.gobblin.source.Source})
+ * * Work Fulfillment/Processing
+ * * Commit
+ *
+ */
+@WorkflowInterface
+public interface ExecuteGobblinWorkflow {
+ /** @return the number of {@link WorkUnit}s discovered and successfully
processed */
+ @WorkflowMethod
+ int execute(Properties props, EventSubmitterContext eventSubmitterContext);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
new file mode 100644
index 000000000..39f401a05
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Properties;
+
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Workflow;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
+import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+@Slf4j
+public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
+ public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits";
+
+ public static final Duration genWUsStartToCloseTimeout =
Duration.ofMinutes(90); // TODO: make configurable
+
+ private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(genWUsStartToCloseTimeout)
+ .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
+ .build();
+
+ private final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class,
+ GEN_WUS_ACTIVITY_OPTS);
+
+ @Override
+ public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
+ EventTimer timer = timerFactory.createJobTimer();
+
+ int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ if (numWUsGenerated > 0) {
+ ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow();
+
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
+
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+
+ int numWUsProcessed = processWUsWorkflow.process(wuSpec);
+ if (numWUsProcessed != numWUsGenerated) {
+ log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
+ // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
+ }
+ }
+ timer.stop();
+ return numWUsGenerated;
+
+ }
+
+ protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow() {
+ ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+ .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+ .build();
+ return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class,
childOpts);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
index 074bb4609..c9b330255 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
@@ -31,6 +31,7 @@ import
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWork
/** {@link
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for
{@link ProcessWorkUnit} */
public class NestingExecOfProcessWorkUnitWorkflowImpl extends
AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> {
+ public static final Duration processWorkUnitStartToCloseTimeout =
Duration.ofMinutes(20); // TODO: make configurable
// RetryOptions specify how to automatically handle retries when Activities
fail.
private static final RetryOptions ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
@@ -41,7 +42,7 @@ public class NestingExecOfProcessWorkUnitWorkflowImpl extends
AbstractNestingExe
.build();
private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofSeconds(999))
+ .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout)
.setRetryOptions(ACTIVITY_RETRY_OPTS)
.build();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 679a39dae..411a4b6a0 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,32 +16,16 @@
*/
package org.apache.gobblin.temporal.ddm.workflow.impl;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.jetbrains.annotations.NotNull;
import com.typesafe.config.ConfigFactory;
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
+
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.instrumented.GobblinMetricsKeys;
-import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -65,29 +49,10 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
@Override
public int process(WUProcessingSpec workSpec) {
- try {
- EventSubmitterContext eventSubmitterContext =
getEventSubmitterContext(workSpec);
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
- try (EventTimer timer = timerFactory.createJobTimer()) {
- return performWork(workSpec);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @NotNull
- private EventSubmitterContext getEventSubmitterContext(WUProcessingSpec
workSpec)
- throws IOException {
- // NOTE: We are using the metrics tags from Job Props to create the metric
context for the timer and NOT
- // the deserialized jobState from HDFS that is created by the real distcp
job. This is because the AZ runtime
- // settings we want are for the job launcher that launched this Yarn job.
- FileSystem fs = Help.loadFileSystemForce(workSpec);
- JobState jobState = Help.loadJobStateUncached(workSpec, fs);
- List<Tag<?>> tagsFromCurrentJob = workSpec.getTags();
- String metricsSuffix = workSpec.getMetricsSuffix();
- List<Tag<?>> tags = getTags(tagsFromCurrentJob, metricsSuffix, jobState);
- return new EventSubmitterContext(tags, JobMetrics.NAMESPACE);
+ Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
+ int result = performWork(workSpec);
+ timer.ifPresent(EventTimer::stop);
+ return result;
}
private int performWork(WUProcessingSpec workSpec) {
@@ -101,7 +66,7 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
int result = commitWorkflow.commit(workSpec);
if (result == 0) {
- log.warn("No work units committed at the job level. They could be
committed at a task level.");
+ log.warn("No work units committed at the job level. They could have
been committed at the task level.");
}
return result;
} else {
@@ -110,13 +75,23 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
}
+ private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec
workSpec) {
+ if (workSpec.isToDoJobLevelTiming()) {
+ EventSubmitterContext eventSubmitterContext =
workSpec.getEventSubmitterContext();
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
+ return Optional.of(timerFactory.createJobTimer());
+ } else {
+ return Optional.empty();
+ }
+ }
+
protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec
workSpec) {
return new
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(),
workSpec.getWorkUnitsDir());
}
protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f) {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
- .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+ .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
.build();
// TODO: to incorporate multiple different concrete `NestingExecWorkflow`
sub-workflows in the same super-workflow... shall we use queues?!?!?
@@ -131,34 +106,4 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
}
-
- private List<Tag<?>> getTags(List<Tag<?>> tagsFromCurJob, String
metricsSuffix, JobState jobStateFromHdfs) {
- // Construct new tags list by combining subset of tags on HDFS job state
and the rest of the fields from the current job
- Map<String, Tag<?>> tagsMap = new HashMap<>();
- Set<String> tagKeysFromJobState = new HashSet<>(Arrays.asList(
- TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
- TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
- TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
- TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
- TimingEvent.FlowEventConstants.JOB_GROUP_FIELD));
-
- // Step 1, Add tags from the AZ props using the original job (the one that
launched this yarn app)
- tagsFromCurJob.forEach(tag -> tagsMap.put(tag.getKey(), tag));
-
- // Step 2. Add tags from the jobState (the original MR job on HDFS)
- List<String> targetKeysToAddSuffix =
Arrays.asList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
- GobblinMetrics.getCustomTagsFromState(jobStateFromHdfs).stream()
- .filter(tag -> tagKeysFromJobState.contains(tag.getKey()))
- .forEach(tag -> {
- // Step 2a (optional): Add a suffix to the FLOW_NAME_FIELD AND
FLOW_GROUP_FIELDS to prevent collisions when testing
- String value = targetKeysToAddSuffix.contains(tag.getKey())
- ? tag.getValue() + metricsSuffix
- : String.valueOf(tag.getValue());
- tagsMap.put(tag.getKey(), new Tag<>(tag.getKey(), value));
- });
-
- // Step 3: Overwrite any pre-existing metadata with name of the current
caller
- tagsMap.put(GobblinMetricsKeys.CLASS_META, new
Tag<>(GobblinMetricsKeys.CLASS_META, getClass().getCanonicalName()));
- return new ArrayList<>(tagsMap.values());
- }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
index 63bce421a..097e279c5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -17,11 +17,15 @@
package org.apache.gobblin.temporal.workflows.metrics;
-import org.slf4j.Logger;
+import java.util.Map;
import io.temporal.workflow.Workflow;
+import org.slf4j.Logger;
+
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.event.TimingEvent;
public class SubmitGTEActivityImpl implements SubmitGTEActivity {
@@ -29,6 +33,17 @@ public class SubmitGTEActivityImpl implements
SubmitGTEActivity {
@Override
public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
+ log.info("submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
eventSubmitterContext.create().submit(eventBuilder);
}
+
+ private static String summarizeEventMetadataForLogging(GobblinEventBuilder
eventBuilder) {
+ Map<String, String> metadata = eventBuilder.getMetadata();
+ return String.format("name: '%s'; namespace: '%s'; type: %s; start:
%s; end: %s",
+ eventBuilder.getName(),
+ eventBuilder.getNamespace(),
+ metadata.getOrDefault(EventSubmitter.EVENT_TYPE, "<<event type not
indicated>>"),
+ metadata.getOrDefault(TimingEvent.METADATA_START_TIME, "<<no start
time>>"),
+ metadata.getOrDefault(TimingEvent.METADATA_END_TIME, "<<no end
time>>"));
+ }
}