This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6f5199d84 [GOBBLIN-1945] Implement Distributed Data Movement (DDM)
Gobblin-on-Temporal `WorkUnit` evaluation (#3816)
6f5199d84 is described below
commit 6f5199d8476fb74879725d8a93e8dc6d080cb0a1
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Oct 31 23:41:45 2023 -0700
[GOBBLIN-1945] Implement Distributed Data Movement (DDM)
Gobblin-on-Temporal `WorkUnit` evaluation (#3816)
* Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit`
evaluation
* Adjust work unit processing tuning for start-to-close timeout and nested
execution branching
* Rework `ProcessWorkUnitImpl` and fix `FileSystem` misuse; plus
convenience abstractions to load `FileSystem`, `JobState`, and
`StateStore<TaskState>`
* Fix `FileSystem` resource lifecycle, uniquely name each workflow, and
drastically reduce worker concurrent task execution
* Heed findbugs advice
* prep before commit
* Improve processing of required props
* Update comment in response to PR feedback
---
.../org/apache/gobblin/configuration/State.java | 5 +-
gobblin-temporal/build.gradle | 1 +
.../temporal/ddm/activity/ProcessWorkUnit.java | 31 +++
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 228 +++++++++++++++++++++
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 96 +++++++++
.../gobblin/temporal/ddm/util/JobStateUtils.java | 98 +++++++++
.../ddm/work/AbstractEagerFsDirBackedWorkload.java | 146 +++++++++++++
...EagerFsDirBackedWorkUnitClaimCheckWorkload.java | 55 +++++
.../temporal/ddm/work/WUProcessingSpec.java | 74 +++++++
.../temporal/ddm/work/WorkUnitClaimCheck.java | 57 ++++++
.../gobblin/temporal/ddm/work/assistance/Help.java | 210 +++++++++++++++++++
.../temporal/ddm/work/styles/FileSystemApt.java | 32 +++
.../ddm/work/styles/FileSystemJobStateful.java | 23 +++
.../temporal/ddm/work/styles/JobStateful.java | 26 +++
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 61 ++++++
.../ddm/workflow/ProcessWorkUnitsWorkflow.java | 32 +++
.../NestingExecOfProcessWorkUnitWorkflowImpl.java | 54 +++++
.../impl/ProcessWorkUnitsWorkflowImpl.java | 64 ++++++
.../java/org/apache/gobblin/util/HadoopUtils.java | 9 +
.../org/apache/gobblin/util/JobLauncherUtils.java | 7 +-
.../org/apache/gobblin/util/PropertiesUtils.java | 14 +-
21 files changed, 1312 insertions(+), 11 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
index eab5665c9..b3d225710 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java
@@ -53,6 +53,7 @@ public class State implements WritableShim {
private static final Joiner LIST_JOINER = Joiner.on(",");
private static final Splitter LIST_SPLITTER =
Splitter.on(",").trimResults().omitEmptyStrings();
+ private static final JsonParser JSON_PARSER = new JsonParser();
private String id;
@@ -62,8 +63,6 @@ public class State implements WritableShim {
@Getter
private Properties specProperties;
- private final JsonParser jsonParser = new JsonParser();
-
public State() {
this.specProperties = new Properties();
this.commonProperties = new Properties();
@@ -476,7 +475,7 @@ public class State implements WritableShim {
* @return {@link JsonArray} value associated with the key
*/
public JsonArray getPropAsJsonArray(String key) {
- JsonElement jsonElement = this.jsonParser.parse(getProp(key));
+ JsonElement jsonElement = this.JSON_PARSER.parse(getProp(key));
Preconditions.checkArgument(jsonElement.isJsonArray(),
"Value for key " + key + " is malformed, it must be a JsonArray: " +
jsonElement);
return jsonElement.getAsJsonArray();
diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle
index 25f9bf835..1832ac909 100644
--- a/gobblin-temporal/build.gradle
+++ b/gobblin-temporal/build.gradle
@@ -27,6 +27,7 @@ dependencies {
compile project(":gobblin-api")
compile project(":gobblin-cluster")
compile project(":gobblin-core")
+ compile project(":gobblin-data-management")
compile project(":gobblin-metrics-libs:gobblin-metrics")
compile project(":gobblin-metastore")
compile project(":gobblin-runtime")
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java
new file mode 100644
index 000000000..bdd9772da
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ProcessWorkUnit.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+
+
+/** Activity for processing/executing a {@link
org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */
+@ActivityInterface
+public interface ProcessWorkUnit {
+ @ActivityMethod
+ // CAUTION: void return type won't work, as apparently it mayn't be the
return type for `io.temporal.workflow.Functions.Func1`!
+ int processWorkUnit(WorkUnitClaimCheck wu);
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
new file mode 100644
index 000000000..640d78da1
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.AbstractTaskStateTracker;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.Task;
+import org.apache.gobblin.runtime.TaskCreationException;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
+import org.apache.gobblin.runtime.troubleshooter.NoopAutomaticTroubleshooter;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+
+@Slf4j
+public class ProcessWorkUnitImpl implements ProcessWorkUnit {
+ private static final int LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE = 100;
+
+ @Override
+ public int processWorkUnit(WorkUnitClaimCheck wu) {
+ try (FileSystem fs = Help.loadFileSystemForce(wu)) {
+ List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
+ log.info("WU [{}] - loaded {} workUnits", wu.getCorrelator(),
workUnits.size());
+ JobState jobState = Help.loadJobState(wu, fs);
+ return execute(workUnits, wu, jobState, fs);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected List<WorkUnit> loadFlattenedWorkUnits(WorkUnitClaimCheck wu,
FileSystem fs) throws IOException {
+ Path wuPath = new Path(wu.getWorkUnitPath());
+ WorkUnit workUnit =
JobLauncherUtils.createEmptyWorkUnitPerExtension(wuPath);
+ Help.deserializeStateWithRetries(fs, wuPath, workUnit, wu);
+ return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit));
+ }
+
+ /**
+ * NOTE: adapted from {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
+ * @return count of how many tasks executed (0 if execution ultimately
failed, but we *believe* TaskState should already have been recorded beforehand)
+ */
+ protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs) throws IOException, InterruptedException {
+ String containerId = "container-id-for-wu-" + wu.getCorrelator();
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
+
+ TaskStateTracker taskStateTracker =
createEssentializedTaskStateTracker(wu);
+ TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
+
+ SharedResourcesBroker<GobblinScopeTypes> resourcesBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ AutomaticTroubleshooter troubleshooter = new NoopAutomaticTroubleshooter();
+ //
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(wu.getStateConfig().getProperties()));
+ troubleshooter.start();
+
+ List<String> fileSourcePaths = workUnits.stream()
+ .map(workUnit -> describeAsCopyableFile(workUnit,
wu.getWorkUnitPath()))
+ .collect(Collectors.toList());
+ log.info("WU [{}] - submitting {} workUnits for copying files: {}",
wu.getCorrelator(),
+ workUnits.size(), fileSourcePaths);
+ log.debug("WU [{}] - (first) workUnit: {}", wu.getCorrelator(),
workUnits.get(0).toJsonString());
+
+ try {
+ GobblinMultiTaskAttempt taskAttempt =
GobblinMultiTaskAttempt.runWorkUnits(
+ jobState.getJobId(), containerId, jobState, workUnits,
+ taskStateTracker, taskExecutor, taskStateStore,
multiTaskAttemptCommitPolicy,
+ resourcesBroker, troubleshooter.getIssueRepository(),
createInterruptionPredicate(fs, jobState));
+ return taskAttempt.getNumTasksCreated();
+ } catch (TaskCreationException tce) { // derived type of `IOException`
that ought not be caught!
+ throw tce;
+ } catch (IOException ioe) {
+ // presume execution already occurred, with `TaskState` written to
reflect outcome
+ log.warn("WU [" + wu.getCorrelator() + "] - continuing on despite
IOException:", ioe);
+ return 0;
+ }
+ }
+
+ /** Demonstration processing, to isolate debugging of WU loading and
deserialization */
+ protected int countSumProperties(List<WorkUnit> workUnits,
WorkUnitClaimCheck wu) {
+ int totalNumProps = workUnits.stream().mapToInt(workUnit ->
workUnit.getPropertyNames().size()).sum();
+ log.info("opened WU [{}] to find {} properties total at '{}'",
wu.getCorrelator(), totalNumProps, wu.getWorkUnitPath());
+ return totalNumProps;
+ }
+
+ protected TaskStateTracker
createEssentializedTaskStateTracker(WorkUnitClaimCheck wu) {
+ return new AbstractTaskStateTracker(new Properties(), log) {
+ @Override
+ public void registerNewTask(Task task) {
+ // TODO: shall we schedule metrics update based on config?
+ }
+
+ @Override
+ public void onTaskRunCompletion(Task task) {
+ task.markTaskCompletion();
+ }
+
+ @Override
+ public void onTaskCommitCompletion(Task task) {
+ TaskState taskState = task.getTaskState();
+ // TODO: if metrics configured, report them now
+ log.info("WU [{} = {}] - finished commit after {}ms with state {}{}",
wu.getCorrelator(), task.getTaskId(),
+ taskState.getTaskDuration(), taskState.getWorkingState(),
+
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
+ ? (" to: " +
taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
+ log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(),
task.getTaskId(),
+ taskState.toJsonString(shouldUseExtendedLogging(wu)));
+ getOptCopyableFile(taskState).ifPresent(copyableFile -> {
+ log.info("WU [{} = {}] - completed copyableFile: {}",
wu.getCorrelator(), task.getTaskId(),
+ copyableFile.toJsonString(shouldUseExtendedLogging(wu)));
+ });
+ }
+ };
+ }
+
+ protected String describeAsCopyableFile(WorkUnit workUnit, String
workUnitPath) {
+ return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
+ .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
+ .orElse(
+ "<<not a CopyableFile("
+ + getOptCopyEntityClass(workUnit, workUnitPath)
+ .map(Class::getSimpleName)
+ .orElse("<<not a CopyEntity!>>")
+ + "): '" + workUnitPath + "'"
+ );
+ }
+
+ protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
+ return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
+ }
+
+ protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
+ return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
+ getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
+ );
+ }
+
+ protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
+ return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
+ log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
+ if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+ String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+ if (serialization != null) {
+ return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
+ }
+ }
+ return Optional.empty();
+ });
+ }
+
+ protected Optional<Class<?>> getOptCopyEntityClass(State state, String
logDesc) {
+ try {
+ return Optional.of(CopySource.getCopyEntityClass(state));
+ } catch (IOException ioe) {
+ log.warn(logDesc + " - failed getting copy entity class:", ioe);
+ return Optional.empty();
+ }
+ }
+
+ protected Predicate<GobblinMultiTaskAttempt>
createInterruptionPredicate(FileSystem fs, JobState jobState) {
+ // TODO - decide whether to support... and if so, employ a useful path;
otherwise, just evaluate predicate to always false
+ Path interruptionPath = new
Path("/not/a/real/path/that/should/ever/exist!");
+ return createInterruptionPredicate(fs, interruptionPath);
+ }
+
+ protected Predicate<GobblinMultiTaskAttempt>
createInterruptionPredicate(FileSystem fs, Path interruptionPath) {
+ return (gmta) -> {
+ try {
+ return fs.exists(interruptionPath);
+ } catch (IOException ioe) {
+ return false;
+ }
+ };
+ }
+
+ protected boolean shouldUseExtendedLogging(WorkUnitClaimCheck wu) {
+ try {
+ return Long.parseLong(wu.getCorrelator()) %
LOG_EXTENDED_PROPS_EVERY_WORK_UNITS_STRIDE == 0;
+ } catch (NumberFormatException nfe) {
+ log.warn("unexpected, non-numeric correlator: '{}'", wu.getCorrelator());
+ return false;
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
new file mode 100644
index 000000000..95425a643
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.launcher;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.typesafe.config.ConfigFactory;
+import io.temporal.client.WorkflowOptions;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.util.PropertiesUtils;
+
+import static
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow
that executes {@link WorkUnit}s to fulfill
+ * the work they specify. see: {@link ProcessWorkUnitsWorkflow}
+ *
+ * <p>
+ * This class is instantiated by the {@link
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job
submission to launch the Gobblin job.
+ * The actual task execution happens in the {@link
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Slf4j
+public class ProcessWorkUnitsJobLauncher extends GobblinTemporalJobLauncher {
+ public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "name.node.uri";
+ public static final String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR
= GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.units.dir";
+
+ public static final String
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.branches.per.tree";
+ public static final String
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX + "work.max.sub.trees.per.tree";
+
+ public static final String WORKFLOW_ID_BASE = "ProcessWorkUnits";
+
+ public ProcessWorkUnitsJobLauncher(
+ Properties jobProps,
+ Path appWorkDir,
+ List<? extends Tag<?>> metadataTags,
+ ConcurrentHashMap<String, Boolean> runningMap
+ ) throws Exception {
+ super(jobProps, appWorkDir, metadataTags, runningMap);
+ }
+
+ @Override
+ public void submitJob(List<WorkUnit> workunits) {
+ try {
+ URI nameNodeUri = new URI(PropertiesUtils.getRequiredProp(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_NAME_NODE_URI));
+ // NOTE: `Path` is challenging for temporal to ser/de, but nonetheless
do pre-construct as `Path`, to pre-validate this prop string's contents
+ Path workUnitsDir = new
Path(PropertiesUtils.getRequiredProp(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_UNITS_DIR));
+ WUProcessingSpec wuSpec = new WUProcessingSpec(nameNodeUri,
workUnitsDir.toString());
+ if
(this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
&&
+
this.jobProps.containsKey(GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+ WorkflowOptions options = WorkflowOptions.newBuilder()
+ .setTaskQueue(this.queueName)
+ .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
+ .build();
+ ProcessWorkUnitsWorkflow workflow =
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
+ workflow.process(wuSpec);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
new file mode 100644
index 000000000..87d91e0c4
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+
+
+/**
+ * Utilities for applying {@link JobState} info to various ends:
+ * - creating a {@link SharedResourcesBroker}
+ * - obtaining a {@link StateStore<TaskState>}
+ */
+@Slf4j
+public class JobStateUtils {
+ private static final String OUTPUT_DIR_NAME = "output"; // following
MRJobLauncher.OUTPUT_DIR_NAME
+
+ // reuse same handle among activities executed by the same worker
+ private static final transient Cache<Path, StateStore<TaskState>>
taskStateStoreByPath = CacheBuilder.newBuilder().build();
+
+ private JobStateUtils() {}
+
+ public static StateStore<TaskState> openTaskStateStore(JobState jobState,
FileSystem fs) {
+ try {
+ Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState,
fs);
+ return taskStateStoreByPath.get(taskStateStorePath, () ->
+ openTaskStateStoreUncached(jobState, fs)
+ );
+ } catch (ExecutionException ee) {
+ throw new RuntimeException(ee);
+ }
+ }
+
+ public static StateStore<TaskState> openTaskStateStoreUncached(JobState
jobState, FileSystem fs) {
+ Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState,
fs);
+ log.info("opening FS task state store at path '{}'", taskStateStorePath);
+ return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(),
TaskState.class);
+ }
+
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return path to {@link FsStateStore<TaskState>} backing dir
+ */
+ public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
+ Properties jobProps = jobState.getProperties();
+ Path jobOutputPath = new Path(
+ new Path(
+ new Path(
+ jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
+ JobState.getJobNameFromProps(jobProps)),
+ JobState.getJobIdFromProps(jobProps)),
+ OUTPUT_DIR_NAME);
+ return fs.makeQualified(jobOutputPath);
+ }
+
+ public static SharedResourcesBroker<GobblinScopeTypes>
getSharedResourcesBroker(JobState jobState) {
+ SharedResourcesBroker<GobblinScopeTypes> globalBroker =
+ SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+ ConfigFactory.parseProperties(jobState.getProperties()),
+ GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+ return globalBroker.newSubscopedBuilder(new
JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
new file mode 100644
index 000000000..f6b6e05f1
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.util.nesting.work.SeqSliceBackedWorkSpan;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.util.HadoopUtils;
+
+
+/**
+ * {@link Workload} of `WORK_ITEM`s (as defined by derived class) that
originates from the eagerly loaded contents of
+ * the directory `fsDir` within the {@link FileSystem} at `nameNodeUri`.
+ *
+ * IMPORTANT: to abide by Temporal's required determinism, a derived class
must provide a {@link Comparator} for the
+ * *total ordering* of `WORK_ITEM`s.
+ */
[email protected] // IMPORTANT: for jackson (de)serialization
[email protected]
[email protected](exclude = { "stateConfig", "cachedWorkItems" })
+@Slf4j
+public abstract class AbstractEagerFsDirBackedWorkload<WORK_ITEM> implements
Workload<WORK_ITEM>, FileSystemApt {
+
+ @Getter
+ @NonNull private URI fileSystemUri;
+ // NOTE: use `String` rather than `Path` to avoid:
com.fasterxml.jackson.databind.exc.MismatchedInputException:
+ // Cannot construct instance of `org.apache.hadoop.fs.Path` (although at
least one Creator exists):
+ // cannot deserialize from Object value (no delegate- or property-based
Creator)
+ @NonNull private String fsDir;
+ @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED)
+ private transient volatile WORK_ITEM[] cachedWorkItems = null;
+
+ @Override
+ public Optional<Workload.WorkSpan<WORK_ITEM>> getSpan(final int startIndex,
final int numElements) {
+ WORK_ITEM[] workItems = getCachedWorkItems();
+ if (startIndex >= workItems.length || startIndex < 0) {
+ return Optional.empty();
+ } else {
+ return Optional.of(new SeqSliceBackedWorkSpan<>(workItems, startIndex,
numElements));
+ }
+ }
+
+ @Override
+ public boolean isIndexKnownToExceed(final int index) {
+ return isDefiniteSize() && cachedWorkItems != null && index >=
cachedWorkItems.length;
+ }
+
+ @Override
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public boolean isDefiniteSize() {
+ return true;
+ }
+
+ protected abstract WORK_ITEM fromFileStatus(FileStatus fileStatus);
+
+ /**
+ * IMPORTANT: to satisfy Temporal's required determinism, the `WORK_ITEM`s
need a consistent total ordering
+ * WARNING: this works so long as dir contents are unchanged in iterim
+ * TODO: handle case of dir contents growing (e.g. use timestamp to filter
out newer paths)... how could we handle the case of shrinking/deletion?
+ */
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ protected abstract Comparator<WORK_ITEM> getWorkItemComparator();
+
+ /** Hook for each `WORK_ITEM` to be associated with its final, post-sorting
ordinal index */
+ protected void acknowledgeOrdering(int index, WORK_ITEM workItem) {
+ // no-op
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ protected PathFilter getPathFilter() {
+ return f -> true;
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ protected final synchronized WORK_ITEM[] getCachedWorkItems() {
+ if (cachedWorkItems != null) {
+ return cachedWorkItems;
+ }
+ try (FileSystem fs = loadFileSystem()) {
+ FileStatus[] fileStatuses = fs.listStatus(new Path(fsDir),
this.getPathFilter());
+ log.info("loaded {} paths from '{}'", fileStatuses.length, fsDir);
+ WORK_ITEM[] workItems =
(WORK_ITEM[])Stream.of(fileStatuses).map(this::fromFileStatus).toArray(Object[]::new);
+ sortWorkItems(workItems);
+ IntStream.range(0, workItems.length)
+ .forEach(i -> this.acknowledgeOrdering(i, workItems[i]));
+ cachedWorkItems = workItems;
+ return cachedWorkItems;
+ } catch (FileNotFoundException fnfe) {
+ throw new RuntimeException("directory not found: '" + fsDir + "'");
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ @Override
+ public State getFileSystemConfig() {
+ return new State(); // TODO - figure out how to truly set!
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ protected FileSystem loadFileSystem() throws IOException {
+ return HadoopUtils.getFileSystem(this.fileSystemUri,
this.getFileSystemConfig());
+ }
+
+ private void sortWorkItems(WORK_ITEM[] workItems) {
+ Arrays.sort(workItems, getWorkItemComparator());
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
new file mode 100644
index 000000000..d2c193cb1
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import java.util.Comparator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.hadoop.fs.FileStatus;
+
+
+/**
+ * {@link AbstractEagerFsDirBackedWorkload} for {@link WorkUnitClaimCheck}
`WORK_ITEM`s, which uses {@link WorkUnitClaimCheck#getWorkUnitPath()}
+ * for their total-ordering.
+ */
[email protected] // IMPORTANT: for jackson (de)serialization
[email protected](callSuper = true)
+public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
+
+ public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String
hdfsDir) {
+ super(fileSystemUri, hdfsDir);
+ }
+
+ @Override
+ protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
+ // begin by setting all correlators to empty
+ return new WorkUnitClaimCheck("", this.getFileSystemUri(),
fileStatus.getPath().toString());
+ }
+
+ @Override
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ protected Comparator<WorkUnitClaimCheck> getWorkItemComparator() {
+ return Comparator.comparing(WorkUnitClaimCheck::getWorkUnitPath);
+ }
+
+ @Override
+ protected void acknowledgeOrdering(int index, WorkUnitClaimCheck item) {
+ // later, after the post-total-ordering indices are know, use each item's
index as its correlator
+ item.setCorrelator(Integer.toString(index));
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
new file mode 100644
index 000000000..3b2597194
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import java.util.Optional;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+
+
+/**
+ * Intended to reference multiple {@link
org.apache.gobblin.source.workunit.WorkUnit}s to process, where `workUnitsDir`
+ * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by
`nameNodeUri`. see:
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WUProcessingSpec implements FileSystemApt, FileSystemJobStateful {
+ @NonNull private URI fileSystemUri;
+ @NonNull private String workUnitsDir;
+ @NonNull private Tuning tuning = Tuning.DEFAULT;
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ @Override
+ public State getFileSystemConfig() {
+ return new State(); // TODO - figure out how to truly set!
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ @Override
+ public Path getJobStatePath() {
+ // TODO: decide whether wise to hard-code... (per `MRJobLauncher`
conventions, we expect job state file to be sibling of WU dir)
+ return new Path(new Path(workUnitsDir).getParent(),
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+ }
+
+ /** Configuration for {@link
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr,
Workload, int, int, int, Optional)}*/
+ @Data
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+ @RequiredArgsConstructor
+ public static class Tuning {
+ public static final int DEFAULT_MAX_BRANCHES_PER_TREE = 900;
+ public static final int DEFAULT_SUB_TREES_PER_TREE = 30;
+
+ public static final Tuning DEFAULT = new
Tuning(DEFAULT_MAX_BRANCHES_PER_TREE, DEFAULT_SUB_TREES_PER_TREE);
+
+ @NonNull private int maxBranchesPerTree;
+ @NonNull private int maxSubTreesPerTree;
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
new file mode 100644
index 000000000..f03215664
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+
+/**
+ * Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by
claim-check, where the `workUnitPath` is resolved
+ * against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`.
see:
+ * @see <a
href="https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check">Claim-Check
Pattern</a>
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkUnitClaimCheck implements FileSystemApt,
FileSystemJobStateful {
+ @NonNull private String correlator;
+ @NonNull private URI fileSystemUri;
+ @NonNull private String workUnitPath;
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ @Override
+ public State getFileSystemConfig() {
+ return new State(); // TODO - figure out how to truly set!
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ @Override
+ public Path getJobStatePath() {
+ // TODO: decide whether wise to hard-code... (per `MRJobLauncher`
conventions, we expect job state file to be sibling of WU dir)
+ return new Path(new Path(workUnitPath).getParent().getParent(),
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
new file mode 100644
index 000000000..f382dcec1
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.assistance;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import com.typesafe.config.Config;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.SerializationUtils;
+
+
+/** Various capabilities useful in implementing Distributed Data Movement
(DDM) */
+@Slf4j
+public class Help {
+ public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5;
+ public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
+ public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
+ public static final String USER_TO_PROXY_KEY = "user.to.proxy";
+
+ // treat `JobState` as immutable and cache, for reuse among activities
executed by the same worker
+ private static final transient Cache<Path, JobState> jobStateByPath =
CacheBuilder.newBuilder().recordStats().build();
+ private static final transient AtomicInteger jobStateAccessCount = new
AtomicInteger(0);
+
+ private Help() {}
+
+ public static String qualifyNamePerExec(String name, FileSystemJobStateful
f, Config workerConfig) {
+ return name + "_" + calcPerExecQualifier(f, workerConfig);
+ }
+
+ public static String qualifyNamePerExec(String name, Config workerConfig) {
+ return name + "_" + calcPerExecQualifier(workerConfig);
+ }
+
+ public static String calcPerExecQualifier(FileSystemJobStateful f, Config
workerConfig) {
+ Optional<String> optFlowExecId = Optional.empty();
+ try {
+ optFlowExecId =
Optional.of(loadJobState(f).getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
null));
+ } catch (IOException e) {
+ log.warn("unable to loadJobState", e);
+ }
+ return optFlowExecId.map(x -> x + "_").orElse("") +
calcPerExecQualifier(workerConfig);
+ }
+
+ public static String calcPerExecQualifier(Config workerConfig) {
+ String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY)
+ ? workerConfig.getString(USER_TO_PROXY_KEY) : "";
+ String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY)
+ ? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) :
UUID.randomUUID().toString();
+ return userToProxy + "_" + azFlowExecId;
+ }
+
+ public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
+ // NOTE: `FileSystem.get` appears to implement caching, which should
facilitate sharing among activities executing on the same worker
+ return loadFileSystemForUri(a.getFileSystemUri(), a.getFileSystemConfig());
+ }
+
+ public static FileSystem loadFileSystemForUri(URI fsUri, State fsConfig)
throws IOException {
+ // TODO - determine whether this works... unclear whether it led to "FS
closed", or that had another cause...
+ // return HadoopUtils.getFileSystem(fsUri, fsConfig);
+ Configuration conf = HadoopUtils.getConfFromState(fsConfig);
+ return FileSystem.get(fsUri, conf);
+ }
+
+ public static FileSystem loadFileSystemForce(FileSystemApt a) throws
IOException {
+ return loadFileSystemForUriForce(a.getFileSystemUri(),
a.getFileSystemConfig());
+ }
+
+ public static FileSystem loadFileSystemForUriForce(URI fsUri, State
fsConfig) throws IOException {
+ // for reasons still not fully understood, we encountered many "FS closed"
failures before disabling HDFS caching--especially as num WUs increased.
+ // perhaps caching-facilitated reuse of the same FS across multiple WUs
caused prior WU execs to leave the FS in a problematic state for subsequent
execs
+ // TODO - more investigation to sort out the true RC... and whether
caching definitively is or is not possible for use here!
+ // return HadoopUtils.getFileSystem(fsUri, fsConfig);
+ Configuration conf = HadoopUtils.getConfFromState(fsConfig);
+ conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ return FileSystem.get(fsUri, conf);
+ }
+
+ public static JobState loadJobState(FileSystemJobStateful f) throws
IOException {
+ try (FileSystem fs = loadFileSystemForce(f)) {
+ return loadJobState(f, fs);
+ }
+ }
+
+ public static JobState loadJobState(JobStateful js, FileSystem fs) throws
IOException {
+ try {
+ incrementJobStateAccess();
+ return jobStateByPath.get(js.getJobStatePath(), () ->
+ loadJobStateUncached(js, fs)
+ );
+ } catch (ExecutionException ee) {
+ throw new IOException(ee);
+ }
+ }
+
+ public static JobState loadJobStateUncached(JobStateful js, FileSystem fs)
throws IOException {
+ JobState jobState = new JobState();
+ SerializationUtils.deserializeState(fs, js.getJobStatePath(), jobState);
+ log.info("loaded jobState from '{}': {}", js.getJobStatePath(),
jobState.toJsonString(true));
+ return jobState;
+ }
+
+ public static JobState loadJobStateWithRetries(FileSystemJobStateful f)
throws IOException {
+ try (FileSystem fs = loadFileSystemForce(f)) {
+ return loadJobStateWithRetries(f, fs);
+ }
+ }
+
+ public static JobState loadJobStateWithRetries(FileSystemJobStateful f,
FileSystem fs) throws IOException {
+ try {
+ incrementJobStateAccess();
+ return jobStateByPath.get(f.getJobStatePath(), () ->
+ loadJobStateUncachedWithRetries(f, fs, f)
+ );
+ } catch (ExecutionException ee) {
+ throw new IOException(ee);
+ }
+ }
+
+ public static JobState loadJobStateUncachedWithRetries(JobStateful js,
FileSystem fs, FileSystemApt fsApt) throws IOException {
+ JobState jobState = new JobState();
+ deserializeStateWithRetries(fs, js.getJobStatePath(), jobState, fsApt,
MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS);
+ log.info("loaded jobState from '{}': {}", js.getJobStatePath(),
jobState.toJsonString(true));
+ return jobState;
+ }
+
+ public static <T extends State> void deserializeStateWithRetries(FileSystem
fs, Path path, T state, FileSystemApt fsApt)
+ throws IOException {
+ deserializeStateWithRetries(fs, path, state, fsApt,
MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS);
+ }
+
+ // TODO: decide whether actually necessary... it was added in a fit of
debugging "FS closed" errors
+ public static <T extends State> void deserializeStateWithRetries(FileSystem
fs, Path path, T state, FileSystemApt fsApt, int maxAttempts)
+ throws IOException {
+ for (int i = 0; i < maxAttempts; ++i) {
+ if (i > 0) {
+ log.info("reopening FS '{}' to retry ({}) deserialization (attempt
{})", fsApt.getFileSystemUri(),
+ state.getClass().getSimpleName(), i);
+ fs = Help.loadFileSystem(fsApt);
+ }
+ try {
+ SerializationUtils.deserializeState(fs, path, state);
+ return;
+ } catch (IOException ioe) {
+ if (ioe.getMessage().equals("Filesystem closed") && i < maxAttempts -
1) {
+ continue;
+ } else {
+ throw ioe;
+ }
+ }
+ }
+ }
+
+ public static StateStore<TaskState> openTaskStateStore(FileSystemJobStateful
f) throws IOException {
+ try (FileSystem fs = Help.loadFileSystem(f)) {
+ return JobStateUtils.openTaskStateStore(Help.loadJobState(f, fs), fs);
+ }
+ }
+
+ public static StateStore<TaskState> openTaskStateStore(FileSystemJobStateful
js, FileSystem fs) throws IOException {
+ return JobStateUtils.openTaskStateStoreUncached(loadJobState(js), fs);
+ // public static StateStore<TaskState> openTaskStateStore(JobStateful js,
FileSystem fs) throws IOException {
+ // return JobStateUtils.openTaskStateStore(loadJobState(js, fs), fs);
+ }
+
+ private static void incrementJobStateAccess() {
+ int numAccesses = jobStateAccessCount.getAndIncrement();
+ if (numAccesses % LOG_CACHE_STATS_EVERY_N_ACCESSES == 0) {
+ log.info("JobState(numAccesses: {}) - {}", numAccesses,
jobStateByPath.stats());
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java
new file mode 100644
index 000000000..324aae04d
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemApt.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.styles;
+
+import java.net.URI;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.gobblin.configuration.State;
+
+
+/** Marks a type that can indicate a {@link org.apache.hadoop.fs.FileSystem}
via its {@link URI} and configuration */
+public interface FileSystemApt {
+
+ URI getFileSystemUri();
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ State getFileSystemConfig();
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java
new file mode 100644
index 000000000..d3b0accdb
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/FileSystemJobStateful.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.styles;
+
+
+/** Marks a type that can indicate both a {@link
org.apache.hadoop.fs.FileSystem} and a {@link
org.apache.gobblin.runtime.JobState} */
+public interface FileSystemJobStateful extends JobStateful, FileSystemApt {
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java
new file mode 100644
index 000000000..3dc7aed32
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/styles/JobStateful.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.styles;
+
+import org.apache.hadoop.fs.Path;
+
+
+/** Marks a type that can indicate a {@link
org.apache.gobblin.runtime.JobState} via its {@link Path} */
+public interface JobStateful {
+ Path getJobStatePath();
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
new file mode 100644
index 000000000..9af6995b5
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.worker;
+
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.WorkerOptions;
+
+import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
+import
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+
+
+/** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
+public class WorkFulfillmentWorker extends AbstractTemporalWorker {
+ public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120;
+ public static final int MAX_EXECUTION_CONCURRENCY = 3;
+
+ public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient)
{
+ super(config, workflowClient);
+ }
+
+ @Override
+ protected Class<?>[] getWorkflowImplClasses() {
+ return new Class[] { ProcessWorkUnitsWorkflowImpl.class,
NestingExecOfProcessWorkUnitWorkflowImpl.class };
+ }
+
+ @Override
+ protected Object[] getActivityImplInstances() {
+ return new Object[] { new ProcessWorkUnitImpl() };
+ }
+
+ @Override
+ protected WorkerOptions createWorkerOptions() {
+ return WorkerOptions.newBuilder()
+ // default is only 1s - WAY TOO SHORT for
`o.a.hadoop.fs.FileSystem#listStatus`!
+
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
+ .setMaxConcurrentActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
+
.setMaxConcurrentLocalActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
+
.setMaxConcurrentWorkflowTaskExecutionSize(MAX_EXECUTION_CONCURRENCY)
+ .build();
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
new file mode 100644
index 000000000..ba2ccf99a
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+
+
+/** Temporal workflow for executing {@link WorkUnit}s to fulfill the work they
specify. */
+@WorkflowInterface
+public interface ProcessWorkUnitsWorkflow {
+ /** @return the number of {@link WorkUnit}s cumulatively processed
successfully */
+ @WorkflowMethod
+ int process(WUProcessingSpec wuSpec);
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
new file mode 100644
index 000000000..074bb4609
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+import java.time.Duration;
+
+import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;
+
+
+/** {@link
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for
{@link ProcessWorkUnit} */
+public class NestingExecOfProcessWorkUnitWorkflowImpl extends
AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> {
+
+ // RetryOptions specify how to automatically handle retries when Activities
fail.
+ private static final RetryOptions ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofSeconds(999))
+ .setRetryOptions(ACTIVITY_RETRY_OPTS)
+ .build();
+
+ private final ProcessWorkUnit activityStub =
Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS);
+
+ @Override
+ protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) {
+ return Async.function(activityStub::processWorkUnit, wu);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
new file mode 100644
index 000000000..eafc62409
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.util.Optional;
+
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.api.enums.v1.ParentClosePolicy;
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
+import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
+import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+
+
+public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {
+ public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits";
+
+ @Override
+ public int process(WUProcessingSpec workSpec) {
+ Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+ NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
+ return processingWorkflow.performWorkload(
+ WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
+ );
+ }
+
+ protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec
workSpec) {
+ return new
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(),
workSpec.getWorkUnitsDir());
+ }
+
+ protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f) {
+ ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+ .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
+ .setWorkflowId(Help.qualifyNamePerExec(CHILD_WORKFLOW_ID_BASE, f,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+ .build();
+ // TODO: to incorporate multiple different concrete `NestingExecWorkflow`
sub-workflows in the same super-workflow... shall we use queues?!?!?
+ return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+ }
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index 3e020f6f8..012795dd1 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -1029,6 +1029,15 @@ public class HadoopUtils {
return
HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri),
conf), state);
}
+ /**
+ * Get a {@link FileSystem} for `fsUri`
+ * @throws IOException
+ */
+ public static FileSystem getFileSystem(URI fsUri, State state) throws
IOException {
+ Configuration conf = HadoopUtils.getConfFromState(state,
Optional.absent());
+ return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(fsUri,
conf), state);
+ }
+
/**
* Get a {@link FileSystem} object for the uri specified at {@link
ConfigurationKeys#WRITER_FILE_SYSTEM_URI}.
* @throws IOException
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 1f20b1b73..89bf4d4bd 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -130,12 +130,7 @@ public class JobLauncherUtils {
public static List<WorkUnit> loadFlattenedWorkUnits(FileSystem fs, Path
path) throws IOException {
WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(path);
SerializationUtils.deserializeState(fs, path, workUnit);
-
- if (workUnit.isMultiWorkUnit()) {
- return JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit)
workUnit).getWorkUnits());
- } else {
- return Lists.newArrayList(workUnit);
- }
+ return JobLauncherUtils.flattenWorkUnits(Lists.newArrayList(workUnit));
}
/** @return an empty {@link WorkUnit}, potentially an empty {@link
MultiWorkUnit}, based on the {@link Path} extension */
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index d6361b927..0aaed3729 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -72,9 +72,19 @@ public class PropertiesUtils {
/** @throws {@link NullPointerException} when `key` not in `properties` */
public static int getRequiredPropAsInt(Properties properties, String key) {
+ return Integer.parseInt(getRequiredPropRaw(properties, key,
Optional.of("an integer")));
+ }
+
+ /** @throws {@link NullPointerException} when `key` not in `properties` */
+ public static String getRequiredProp(Properties properties, String key) {
+ return getRequiredPropRaw(properties, key, Optional.absent());
+ }
+
+ /** @throws {@link NullPointerException} when `key` not in `properties` */
+ public static String getRequiredPropRaw(Properties properties, String key,
Optional<String> desc) {
String value = properties.getProperty(key);
- Preconditions.checkNotNull(value, "'" + key + "' must be set (to an
integer)");
- return Integer.parseInt(value);
+ Preconditions.checkNotNull(value, "'" + key + "' must be set" +
desc.transform(s -> " (to " + desc + ")").or(""));
+ return value;
}
public static long getPropAsLong(Properties properties, String key, long
defaultValue) {