This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f6eeb533ad [GOBBLIN-2152] Cleanup activity gobblin temporal (#4047)
f6eeb533ad is described below
commit f6eeb533ad757aaa34bfb8e69a833732956f3775
Author: William Lo <[email protected]>
AuthorDate: Wed Sep 11 13:52:38 2024 -0400
[GOBBLIN-2152] Cleanup activity gobblin temporal (#4047)
Adds cleanup activity and job scoped directories in Temporal
---
...eWorkUnits.java => DeleteWorkDirsActivity.java} | 18 ++--
.../temporal/ddm/activity/GenerateWorkUnits.java | 3 +-
.../ddm/activity/impl/CommitActivityImpl.java | 8 +-
.../activity/impl/DeleteWorkDirsActivityImpl.java | 103 +++++++++++++++++++++
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 53 +++++++++--
.../ddm/launcher/ExecuteGobblinJobLauncher.java | 31 +++++--
.../ddm/launcher/GenerateWorkUnitsJobLauncher.java | 13 ++-
.../DirDeletionResult.java} | 26 +++---
.../GenerateWorkUnitsResult.java} | 30 +++---
.../gobblin/temporal/ddm/work/assistance/Help.java | 18 ++--
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 5 +-
.../ddm/workflow/GenerateWorkUnitsWorkflow.java | 3 +-
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 93 ++++++++++++++++++-
.../impl/GenerateWorkUnitsWorkflowImpl.java | 3 +-
.../impl/ProcessWorkUnitsWorkflowImpl.java | 7 +-
.../activity/impl/GenerateWorkUnitsImplTest.java | 97 +++++++++++++++++++
.../impl/ExecuteGobblinWorkflowImplTest.java | 62 +++++++++++++
17 files changed, 495 insertions(+), 78 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java
similarity index 66%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java
index 5f3f0c6391..16d0333c11 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/DeleteWorkDirsActivity.java
@@ -14,22 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.temporal.ddm.activity;
-import java.util.Properties;
+import java.util.Set;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
-/** Activity for generating {@link WorkUnit}s and persisting them to the
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
+/** Activity for deleting up a list of temporary work directories */
@ActivityInterface
-public interface GenerateWorkUnits {
- /** @return the number of {@link WorkUnit}s generated and persisted */
+public interface DeleteWorkDirsActivity {
+ /**
+ * Clean the list of resources specified in the input
+ * TODO: Generalize the input to support multiple platforms outside of just
HDFS
+ */
@ActivityMethod
- int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext);
+ DirDeletionResult delete(WUProcessingSpec workSpec, EventSubmitterContext
eventSubmitterContext, Set<String> workDirPaths);
}
+
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
index 5f3f0c6391..862b46c40f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
@@ -23,6 +23,7 @@ import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -31,5 +32,5 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
public interface GenerateWorkUnits {
/** @return the number of {@link WorkUnit}s generated and persisted */
@ActivityMethod
- int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext);
+ GenerateWorkUnitsResult generateWorkUnits(Properties jobProps,
EventSubmitterContext eventSubmitterContext);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index e8490d7141..97699f2ce2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -153,7 +153,7 @@ public class CommitActivityImpl implements CommitActivity {
}).iterator(), numCommitThreads,
// TODO: Rewrite executorUtils to use java util optional
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("Commit-thread-%d")))
- .executeAndGetResults();
+ .executeAndGetResults();
IteratorExecutor.logFailures(result, null, 10);
@@ -192,8 +192,8 @@ public class CommitActivityImpl implements CommitActivity {
// `TaskState extends WorkUnit` serialization will include its
constituent `WorkUnit`, but not the constituent `JobState`.
// given some `JobState` props may be essential for
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
taskState.setJobState(jobState)
- // TODO - decide whether something akin necessary to
streamline cumulative in-memory size of all issues:
consumeTaskIssues(taskState);
- ).collect(Collectors.toList())
+ // TODO - decide whether something akin necessary to streamline
cumulative in-memory size of all issues: consumeTaskIssues(taskState);
+ ).collect(Collectors.toList())
).orElseGet(() -> {
log.error("TaskStateStore successfully opened, but no task states found
under (name) '{}'", jobIdPathName);
return Lists.newArrayList();
@@ -235,4 +235,4 @@ public class CommitActivityImpl implements CommitActivity {
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java
new file mode 100644
index 0000000000..6b14a70156
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/DeleteWorkDirsActivityImpl.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import io.temporal.failure.ApplicationFailure;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
+import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class DeleteWorkDirsActivityImpl implements DeleteWorkDirsActivity {
+ static String UNDEFINED_JOB_NAME = "<job_name_stub>";
+
+ @Override
+ public DirDeletionResult delete(WUProcessingSpec workSpec,
EventSubmitterContext eventSubmitterContext, Set<String> workDirPaths) {
+ // Ensure that non HDFS writers exit early as they rely on a different
cleanup process, can consider consolidation in the future
+ // through an abstracted cleanup method implemented at a writer level
+ if (workDirPaths.isEmpty()) {
+ return new DirDeletionResult();
+ }
+ //TODO: Emit timers to measure length of cleanup step
+ Optional<String> optJobName = Optional.empty();
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ optJobName = Optional.ofNullable(jobState.getJobName());
+
+ Map<String, Boolean> attemptedCleanedDirectories =
jobState.getPropAsBoolean(ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK,
ConfigurationKeys.DEFAULT_CLEANUP_STAGING_DATA_PER_TASK) ?
+ cleanupStagingDataPerTask(jobState, workDirPaths) :
cleanupStagingDataForEntireJob(jobState, workDirPaths);
+
+ return new DirDeletionResult(attemptedCleanedDirectories);
+ } catch (Exception e) {
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed to cleanup temporary folders for job %s",
optJobName.orElse(UNDEFINED_JOB_NAME)),
+ IOException.class.toString(),
+ new IOException(e)
+ );
+ }
+ }
+
+ //TODO: Support task level deletes if necessary, currently it is deemed
redundant due to collecting temp dirs during generate work unit step
+ private static Map<String, Boolean> cleanupStagingDataPerTask(JobState
jobState, Set<String> workDirPaths) throws IOException {
+ throw new IOException("Clean up staging data by task is not supported.
Please set " + ConfigurationKeys.CLEANUP_STAGING_DATA_PER_TASK + " to false.");
+ }
+
+ private static Map<String, Boolean> cleanupStagingDataForEntireJob(JobState
state, Set<String> workDirPaths) throws IOException {
+
+ String writerFsUri =
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
+ FileSystem fs = JobLauncherUtils.getFsWithProxy(state, writerFsUri,
WriterUtils.getFsConfiguration(state));
+ Map<String, Boolean> attemptedCleanedDirectories = new HashMap<>();
+
+ for (String resource : workDirPaths) {
+ Path pathToClean = new Path(resource);
+ log.info("Deleting resource directory " + pathToClean);
+ try {
+ HadoopUtils.deletePath(fs, pathToClean, true);
+ attemptedCleanedDirectories.put(resource, true);
+ } catch (IOException e) {
+ boolean doesExist = fs.exists(pathToClean);
+ // Only record failure to clean if the directory still exists, if it
does not then we can assume it was already cleaned by another process
+ if (doesExist) {
+ log.error("Failed to delete resource directory " + pathToClean, e);
+ attemptedCleanedDirectories.put(resource, false);
+ }
+ }
+ }
+ return attemptedCleanedDirectories;
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 242b2c69e5..8344bae6f9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -18,8 +18,11 @@
package org.apache.gobblin.temporal.ddm.activity.impl;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,6 +33,7 @@ import com.google.common.io.Closer;
import io.temporal.failure.ApplicationFailure;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -40,10 +44,12 @@ import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -53,7 +59,7 @@ import
org.apache.gobblin.writer.initializer.WriterInitializerFactory;
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
@Override
- public int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
+ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
// TODO: decide whether to acquire a job lock (as MR did)!
// TODO: provide for job cancellation (unless handling at the
temporal-level of parent workflows)!
JobState jobState = new JobState(jobProps);
@@ -74,12 +80,12 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
FileSystem fs = JobStateUtils.openFileSystem(jobState);
fs.mkdirs(workDirRoot);
- List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState,
eventSubmitterContext, closer);
-
+ Set<String> resourcesToCleanUp = new HashSet<>();
+ List<WorkUnit> workUnits =
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState,
eventSubmitterContext, closer, resourcesToCleanUp);
JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs);
- return jobState.getTaskCount();
+ return new GenerateWorkUnitsResult(jobState.getTaskCount(),
resourcesToCleanUp);
} catch (ReflectiveOperationException roe) {
String errMsg = "Unable to construct a source for generating workunits
for job " + jobState.getJobId();
log.error(errMsg, roe);
@@ -94,7 +100,8 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
}
}
- protected static List<WorkUnit> generateWorkUnitsForJobState(JobState
jobState, EventSubmitterContext eventSubmitterContext, Closer closer)
+ protected List<WorkUnit>
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState,
EventSubmitterContext eventSubmitterContext, Closer closer,
+ Set<String> resourcesToCleanUp)
throws ReflectiveOperationException {
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
@@ -120,7 +127,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs,
eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream =
datasetHandlerService.executeHandlers(workUnitStream);
-
+
resourcesToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
// initialize writer and converter(s)
// TODO: determine whether registration here is effective, or the
lifecycle of this activity is too brief (as is likely!)
closer.register(WriterInitializerFactory.newInstace(jobState,
handledWorkUnitStream)).initialize();
@@ -142,4 +149,38 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
}
+
+ protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream
workUnitStream) {
+ Set<String> resourcesToCleanUp = new HashSet<>();
+ // Validate every workunit if they have the temp dir props since some
workunits may be commit steps
+ Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
+ while (workUnitIterator.hasNext()) {
+ WorkUnit workUnit = workUnitIterator.next();
+ if (workUnit.isMultiWorkUnit()) {
+ List<WorkUnit> workUnitList = ((MultiWorkUnit)
workUnit).getWorkUnits();
+ for (WorkUnit wu : workUnitList) {
+
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
+ }
+ } else {
+
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
+ }
+ }
+ return resourcesToCleanUp;
+ }
+
+
+ private static Set<String>
collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit workUnit) {
+ Set<String> resourcesToCleanUp = new HashSet<>();
+ if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_STAGING_DIR));
+ }
+ if (workUnit.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+ }
+ if (workUnit.getPropAsBoolean(ConfigurationKeys.CLEAN_ERR_DIR,
ConfigurationKeys.DEFAULT_CLEAN_ERR_DIR)
+ && workUnit.contains(ConfigurationKeys.ROW_LEVEL_ERR_FILE)) {
+
resourcesToCleanUp.add(workUnit.getProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE));
+ }
+ return resourcesToCleanUp;
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 6950e6a678..6efb93d128 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -24,14 +24,19 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.fs.Path;
import com.google.common.eventbus.EventBus;
-import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.temporal.client.WorkflowOptions;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
@@ -41,6 +46,8 @@ import
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -74,22 +81,32 @@ public class ExecuteGobblinJobLauncher extends
GobblinTemporalJobLauncher {
@Override
public void submitJob(List<WorkUnit> workunits) {
try {
+ Properties finalProps = adjustJobProperties(this.jobProps);
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps)))
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(finalProps)))
.build();
ExecuteGobblinWorkflow workflow =
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
- Config jobConfigWithOverrides =
applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));
-
- Help.propagateGaaSFlowExecutionContext(this.jobProps);
+ Help.propagateGaaSFlowExecutionContext(finalProps);
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder(eventSubmitter)
- .withGaaSJobProps(this.jobProps)
+ .withGaaSJobProps(finalProps)
.build();
- ExecGobblinStats execGobblinStats =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
+ ExecGobblinStats execGobblinStats = workflow.execute(finalProps,
eventSubmitterContext);
log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}",
execGobblinStats);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+
+ // Generate properties such as Job ID, modifying task staging dirs and
output dirs
+ protected Properties adjustJobProperties(Properties inputJobProps) throws
Exception {
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseProperties(inputJobProps),
+ GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+ Properties configOverridesProp =
ConfigUtils.configToProperties(applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(inputJobProps)));
+ configOverridesProp.setProperty(ConfigurationKeys.JOB_ID_KEY,
JobLauncherUtils.newJobId(JobState.getJobNameFromProps(configOverridesProp),
+ PropertiesUtils.getPropAsLong(configOverridesProp,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis())));
+ JobContext jobContext = new JobContext(configOverridesProp, log,
instanceBroker, null);
+ return jobContext.getJobState().getProperties();
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
index 6ddc96a977..68b7910bcd 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -17,24 +17,24 @@
package org.apache.gobblin.temporal.ddm.launcher;
-import com.typesafe.config.Config;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.temporal.client.WorkflowOptions;
-
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
@@ -80,9 +80,8 @@ public class GenerateWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
Help.propagateGaaSFlowExecutionContext(this.jobProps);
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder(this.eventSubmitter).build();
-
- int numWorkUnits =
workflow.generate(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
- log.info("FINISHED - GenerateWorkUnitsWorkflow.generate = {}",
numWorkUnits);
+ GenerateWorkUnitsResult generateWorkUnitStats =
workflow.generate(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
+ log.info("FINISHED - GenerateWorkUnitsWorkflow.generate = {}",
generateWorkUnitStats.getGeneratedWuCount());
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
similarity index 54%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
index 5f3f0c6391..257e5b5772 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
@@ -15,21 +15,23 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.ddm.activity;
+package org.apache.gobblin.temporal.ddm.work;
-import java.util.Properties;
+import java.util.Map;
-import io.temporal.activity.ActivityInterface;
-import io.temporal.activity.ActivityMethod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+/**
+ * Data structure representing the stats for a cleaned up work directory,
where it returns a map of directories the result of their cleanup
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class DirDeletionResult {
-/** Activity for generating {@link WorkUnit}s and persisting them to the
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
-@ActivityInterface
-public interface GenerateWorkUnits {
- /** @return the number of {@link WorkUnit}s generated and persisted */
- @ActivityMethod
- int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext);
+ @NonNull private Map<String, Boolean> successesByDirPath;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
similarity index 52%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
index 5f3f0c6391..5f798055e2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
@@ -15,21 +15,25 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.ddm.activity;
+package org.apache.gobblin.temporal.ddm.work;
-import java.util.Properties;
+import java.util.Set;
-import io.temporal.activity.ActivityInterface;
-import io.temporal.activity.ActivityMethod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
-
-/** Activity for generating {@link WorkUnit}s and persisting them to the
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
-@ActivityInterface
-public interface GenerateWorkUnits {
- /** @return the number of {@link WorkUnit}s generated and persisted */
- @ActivityMethod
- int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext);
+/**
+ * Data structure representing the result of generating work units, where it
returns the number of generated work units and
+ * the folders should be cleaned up after the full job execution is completed
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class GenerateWorkUnitsResult {
+ @NonNull private int generatedWuCount;
+ // Resources that the Temporal Job Launcher should clean up for Gobblin
temporary work directory paths in writers
+ @NonNull private Set<String> workDirPathsToDelete;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 462e784228..e2011d7a9c 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -22,22 +22,20 @@ import java.net.URI;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.MDC;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-
import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.MDC;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -48,8 +46,8 @@ import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
-import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
+import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index d4b8b3a911..74737af598 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -20,11 +20,13 @@ package org.apache.gobblin.temporal.ddm.worker;
import java.util.concurrent.TimeUnit;
import com.typesafe.config.Config;
+
import io.temporal.client.WorkflowClient;
import io.temporal.worker.WorkerOptions;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
+import
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
@@ -52,7 +54,8 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new CommitActivityImpl(), new
GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()
};
+ return new Object[] { new CommitActivityImpl(), new
DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(),
+ new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()};
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
index 7c843e1afe..6865c765f5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
@@ -23,6 +23,7 @@ import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -31,5 +32,5 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
public interface GenerateWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s generated and persisted */
@WorkflowMethod
- int generate(Properties props, EventSubmitterContext eventSubmitterContext);
+ GenerateWorkUnitsResult generate(Properties props, EventSubmitterContext
eventSubmitterContext);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index f2723bef97..8a04e6e519 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -17,10 +17,15 @@
package org.apache.gobblin.temporal.ddm.workflow.impl;
+import java.io.IOException;
import java.net.URI;
import java.time.Duration;
+import java.util.HashSet;
+import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.typesafe.config.ConfigFactory;
@@ -36,11 +41,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -72,23 +80,41 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
private final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class,
GEN_WUS_ACTIVITY_OPTS);
+ private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofHours(1))
+ .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
+ .build();
+ private final DeleteWorkDirsActivity deleteWorkDirsActivityStub =
Workflow.newActivityStub(DeleteWorkDirsActivity.class,
DELETE_WORK_DIRS_ACTIVITY_OPTS);
+
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit();
EventTimer timer = timerFactory.createJobTimer();
+ Optional<GenerateWorkUnitsResult> generateWorkUnitResultsOpt =
Optional.empty();
+ WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
+ boolean isSuccessful = false;
try {
- int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ generateWorkUnitResultsOpt =
Optional.of(genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext));
+ int numWUsGenerated =
generateWorkUnitResultsOpt.get().getGeneratedWuCount();
int numWUsCommitted = 0;
CommitStats commitStats = CommitStats.createEmpty();
if (numWUsGenerated > 0) {
- WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
commitStats = processWUsWorkflow.process(wuSpec);
numWUsCommitted = commitStats.getNumCommittedWorkUnits();
}
timer.stop();
- return new ExecGobblinStats(numWUsGenerated, numWUsCommitted,
jobProps.getProperty(Help.USER_TO_PROXY_KEY), commitStats.getDatasetStats());
+ isSuccessful = true;
+ return new ExecGobblinStats(numWUsGenerated, numWUsCommitted,
jobProps.getProperty(Help.USER_TO_PROXY_KEY),
+ commitStats.getDatasetStats());
} catch (Exception e) {
// Emit a failed GobblinTrackingEvent to record job failures
timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
@@ -98,6 +124,27 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
e,
null
);
+ } finally {
+ // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid
flight
+ try {
+ log.info("Cleaning up work dirs for job {}",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ if (generateWorkUnitResultsOpt.isPresent()) {
+ cleanupWorkDirs(wuSpec, eventSubmitterContext,
generateWorkUnitResultsOpt.get().getWorkDirPathsToDelete());
+ } else {
+ log.warn("Skipping cleanup of work dirs for job due to no output
from GenerateWorkUnits");
+ }
+ } catch (IOException e) {
+ // Only fail the job with a new failure if the job was successful,
otherwise keep the original error
+ if (isSuccessful) {
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed cleaning Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
+ e.getClass().getName(),
+ e,
+ null
+ );
+ }
+ log.error("Failed to cleanup work dirs", e);
+ }
}
}
@@ -123,4 +170,44 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
}
return wuSpec;
}
+
+ private void cleanupWorkDirs(WUProcessingSpec workSpec,
EventSubmitterContext eventSubmitterContext, Set<String> directoriesToClean)
+ throws IOException {
+ // TODO: Add configuration to support cleaning up historical work dirs
from same job name
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ // TODO: Avoid cleaning up if work is being checkpointed e.g. midway of a
commit for EXACTLY_ONCE
+
+ if (PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.CLEANUP_STAGING_DATA_BY_INITIALIZER, "false")) {
+ log.info("Skipping cleanup of work dirs for job due to initializer
handling the cleanup");
+ return;
+ }
+
+ DirDeletionResult dirDeletionResult =
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
+ calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
+
+ for (String dir : directoriesToClean) {
+ if (!dirDeletionResult.getSuccessesByDirPath().get(dir)) {
+ throw new IOException("Unable to delete one of more directories in the
DeleteWorkDirsActivity. Please clean up manually.");
+ }
+ }
+ }
+
+ protected static Set<String> calculateWorkDirsToDelete(String jobId,
Set<String> workDirsToClean) throws IOException {
+ // Only delete directories that are associated with the current job,
otherwise
+ Set<String> resultSet = new HashSet<>();
+ Set<String> nonJobDirs = new HashSet<>();
+ for (String dir : workDirsToClean) {
+ if (dir.contains(jobId)) {
+ resultSet.add(dir);
+ } else {
+ log.warn("Skipping deletion of directory {} as it is not associated
with job {}", dir, jobId);
+ nonJobDirs.add(dir);
+ }
+ }
+ if (!nonJobDirs.isEmpty()) {
+ throw new IOException("Found directories set to delete not associated
with job " + jobId + ": " + nonJobDirs + ". Please validate staging and output
directories");
+ }
+ return resultSet;
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
index 5fff3fe9b2..ad68fb518f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
@@ -27,6 +27,7 @@ import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -50,7 +51,7 @@ public class GenerateWorkUnitsWorkflowImpl implements
GenerateWorkUnitsWorkflow
private final GenerateWorkUnits activityStub =
Workflow.newActivityStub(GenerateWorkUnits.class, ACTIVITY_OPTS);
@Override
- public int generate(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
+ public GenerateWorkUnitsResult generate(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
return activityStub.generateWorkUnits(jobProps, eventSubmitterContext);
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index c8afbce25a..975b9f6043 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -23,7 +23,6 @@ import com.typesafe.config.ConfigFactory;
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
@@ -59,10 +58,8 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
private CommitStats performWork(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
- int workunitsProcessed = processingWorkflow.performWorkload(
- WorkflowAddr.ROOT, workload, 0,
- workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
- );
+ int workunitsProcessed =
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
CommitStats result = commitWorkflow.commit(workSpec);
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
new file mode 100644
index 0000000000..86c5ac12de
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+
+
+public class GenerateWorkUnitsImplTest {
+
+ @Test
+ public void testFetchesWorkDirsFromWorkUnits() {
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ WorkUnit workUnit = new WorkUnit();
+ workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i);
+ workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i);
+ workUnit.setProp("qualitychecker.row.err.file",
"/tmp/jobId/row-err/file");
+ workUnit.setProp("qualitychecker.clean.err.dir", "true");
+ workUnits.add(workUnit);
+ }
+ WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+ .setFiniteStream(true)
+ .build();
+ Set<String> output =
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+ Assert.assertEquals(output.size(), 11);
+ }
+
+ @Test
+ public void testFetchesWorkDirsFromMultiWorkUnits() {
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+ for (int j = 0; j < 3; j++) {
+ WorkUnit workUnit = new WorkUnit();
+ workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/");
+ workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/");
+ workUnit.setProp("qualitychecker.row.err.file",
"/tmp/jobId/row-err/file");
+ workUnit.setProp("qualitychecker.clean.err.dir", "true");
+ multiWorkUnit.addWorkUnit(workUnit);
+ }
+ workUnits.add(multiWorkUnit);
+ }
+ WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+ .setFiniteStream(true)
+ .build();
+ Set<String> output =
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+ Assert.assertEquals(output.size(), 3);
+ }
+
+ @Test
+ public void testFetchesUniqueWorkDirsFromMultiWorkUnits() {
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+ for (int j = 0; j < 3; j++) {
+ WorkUnit workUnit = new WorkUnit();
+ // Each MWU will have its own staging and output dir
+ workUnit.setProp("writer.staging.dir", "/tmp/jobId/" + i +
"/task-staging/");
+ workUnit.setProp("writer.output.dir", "/tmp/jobId/" + i +
"task-output/");
+ workUnit.setProp("qualitychecker.row.err.file",
"/tmp/jobId/row-err/file");
+ workUnit.setProp("qualitychecker.clean.err.dir", "true");
+ multiWorkUnit.addWorkUnit(workUnit);
+ }
+ workUnits.add(multiWorkUnit);
+ }
+ WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits)
+ .setFiniteStream(true)
+ .build();
+ Set<String> output =
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
+ Assert.assertEquals(output.size(), 11);
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java
new file mode 100644
index 0000000000..8d8ad982a6
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImplTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ExecuteGobblinWorkflowImplTest {
+
+ @Test
+ public void testCalculateWorkDirsDeletion() throws Exception {
+ String jobId = "jobId";
+ Set<String> dirsToDelete = new HashSet<>();
+ dirsToDelete.add("/tmp/jobId/task-staging/");
+ dirsToDelete.add("/tmp/jobId/task-output/");
+ dirsToDelete.add("/tmp/jobId/task-output/file");
+ dirsToDelete.add("/tmp/jobId/otherDir");
+ Set<String> result =
ExecuteGobblinWorkflowImpl.calculateWorkDirsToDelete(jobId, dirsToDelete);
+ Assert.assertEquals(result.size(), 4);
+ Assert.assertEquals(result, dirsToDelete);
+ }
+
+ @Test
+ public void testThrowsIfNonJobDirInWorkDirs() throws Exception {
+ String jobId = "jobId";
+ Set<String> dirsToDelete = new HashSet<>();
+ dirsToDelete.add("/tmp/jobId/task-staging/");
+ dirsToDelete.add("/tmp/jobId/task-output/");
+ dirsToDelete.add("/tmp/jobId/task-output/file");
+ dirsToDelete.add("/tmp/jobId");
+ // Add a non-job dir that should blow up the job
+ dirsToDelete.add("/tmp");
+ dirsToDelete.add("/sharedDir");
+
+ Assert.assertThrows(IOException.class, () ->
ExecuteGobblinWorkflowImpl.calculateWorkDirsToDelete(jobId, dirsToDelete));
+ try {
+ ExecuteGobblinWorkflowImpl.calculateWorkDirsToDelete(jobId,
dirsToDelete);
+ } catch (IOException e) {
+ Assert.assertEquals(e.getMessage(),
+ "Found directories set to delete not associated with job jobId:
[/tmp, /sharedDir]. Please validate staging and output directories");
+ }
+ }
+}