This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a78ee5473 [GOBBLIN-2007] Implement Distributed Data Movement (DDM)
Gobblin-on-Temporal `WorkUnit` generation (for arbitrary `Source`) (#3880)
a78ee5473 is described below
commit a78ee5473dc9f24bf9acbf8432c9d838d25655c7
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Feb 26 12:31:36 2024 -0800
[GOBBLIN-2007] Implement Distributed Data Movement (DDM)
Gobblin-on-Temporal `WorkUnit` generation (for arbitrary `Source`) (#3880)
* Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit`
generation (for arbitrary `Source`)
* Update in response to review comments
---
.../gobblin/cluster/GobblinHelixJobLauncher.java | 2 +-
.../gobblin/runtime/AbstractJobLauncher.java | 6 +-
.../temporal/GobblinTemporalConfigurationKeys.java | 1 +
.../temporal/cluster/AbstractTemporalWorker.java | 2 +-
.../cluster/GobblinTemporalTaskRunner.java | 6 +-
.../temporal/ddm/activity/GenerateWorkUnits.java | 35 +++++
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 154 +++++++++++++++++++++
.../ddm/launcher/GenerateWorkUnitsJobLauncher.java | 90 ++++++++++++
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 2 +-
.../gobblin/temporal/ddm/util/JobStateUtils.java | 101 ++++++++++++--
.../gobblin/temporal/ddm/work/assistance/Help.java | 50 +++++--
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 6 +-
.../ddm/workflow/GenerateWorkUnitsWorkflow.java | 35 +++++
.../impl/GenerateWorkUnitsWorkflowImpl.java | 56 ++++++++
.../impl/ProcessWorkUnitsWorkflowImpl.java | 4 +-
.../temporal/joblauncher/GobblinJobLauncher.java | 2 +-
.../joblauncher/GobblinTemporalJobLauncher.java | 29 +++-
.../org/apache/gobblin/util/JobLauncherUtils.java | 14 +-
18 files changed, 542 insertions(+), 53 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 15fff870b..33b9e0aaf 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -301,7 +301,7 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
// For streaming use case, this might be a necessary step to find dataset
specific namespace so that each workUnit
// can create staging and temp directories with the correct determination
of shard-path
- workUnitStream = this.executeHandlers(workUnitStream,
this.destDatasetHandlerService);
+ workUnitStream =
this.destDatasetHandlerService.executeHandlers(workUnitStream);
workUnitStream = this.processWorkUnitStream(workUnitStream, jobState);
workUnits = materializeWorkUnitList(workUnitStream);
try {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 578696f62..e0078803c 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -516,7 +516,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.canCleanUpStagingData =
this.canCleanStagingData(this.jobContext.getJobState());
this.destDatasetHandlerService = new
DestinationDatasetHandlerService(jobState, canCleanUpStagingData,
this.eventSubmitter);
closer.register(this.destDatasetHandlerService);
- workUnitStream = this.executeHandlers(workUnitStream,
this.destDatasetHandlerService);
+ workUnitStream =
this.destDatasetHandlerService.executeHandlers(workUnitStream);
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState,
workUnitStream)).initialize();
@@ -698,10 +698,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
}
- protected WorkUnitStream executeHandlers(WorkUnitStream workUnitStream,
DestinationDatasetHandlerService datasetHandlerService) {
- return datasetHandlerService.executeHandlers(workUnitStream);
- }
-
protected WorkUnitStream processWorkUnitStream(WorkUnitStream
workUnitStream, JobState jobState) {
// Add task ids
workUnitStream = prepareWorkUnits(workUnitStream, jobState);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 182ce7daa..40223e093 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -42,6 +42,7 @@ public interface GobblinTemporalConfigurationKeys {
String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS =
HelloWorldJobLauncher.class.getName();
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
+ String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides";
/**
* Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing
collisions with prod jobs
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
index 0ed6652eb..8ab428c41 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
@@ -78,7 +78,7 @@ public abstract class AbstractTemporalWorker implements
TemporalWorker {
protected abstract Object[] getActivityImplInstances();
private final void stashWorkerConfig(Config cfg) {
- // stash in association with...
+ // stash to associate with...
WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
Arrays.stream(getWorkflowImplClasses()).forEach(clazz ->
WorkerConfig.withImpl(clazz, cfg)); // its workflow impls
Arrays.stream(getActivityImplInstances()).forEach(obj ->
WorkerConfig.withImpl(obj.getClass(), cfg)); // its activity impls
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index 61545f6ed..4e55c9777 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -246,10 +246,12 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+ logger.info("Creating worker - class: '{}'", workerClassName);
+ Config workerConfig = clusterConfig;
TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
- (Class<TemporalWorker>)Class.forName(workerClassName), clusterConfig,
client);
+ (Class<TemporalWorker>)Class.forName(workerClassName), workerConfig,
client);
worker.start();
- logger.info("A new worker is started.");
+ logger.info("Finished starting worker - class: '{}'", workerClassName);
return worker;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
new file mode 100644
index 000000000..5f3f0c639
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.util.Properties;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/** Activity for generating {@link WorkUnit}s and persisting them to the
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
+@ActivityInterface
+public interface GenerateWorkUnits {
+ /** @return the number of {@link WorkUnit}s generated and persisted */
+ @ActivityMethod
+ int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext);
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
new file mode 100644
index 000000000..797ba7951
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import com.google.api.client.util.Lists;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import io.temporal.failure.ApplicationFailure;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
+import org.apache.gobblin.destination.DestinationDatasetHandlerService;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.WorkUnitStreamSource;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
+
+
+@Slf4j
+public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
+
+ @Override
+ public int generateWorkUnits(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
+ // TODO: decide whether to acquire a job lock (as MR did)!
+ // TODO: provide for job cancellation (unless handling at the
temporal-level of parent workflows)!
+ JobState jobState = new JobState(jobProps);
+ log.info("Created jobState: {}", jobState.toJsonString(true));
+ Optional<Config> thisClassConfig = WorkerConfig.of(this);
+ log.info("Obtained class config: {}", thisClassConfig.isPresent() ?
thisClassConfig.get() : "NO WORKER CONFIG: ERROR!");
+
+ Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
+ log.info("Using work dir root path for job '{}' - '{}'",
jobState.getJobId(), workDirRoot);
+
+ // TODO: determine whether these are actually necessary to do (as
MR/AbstractJobLauncher did)!
+ // SharedResourcesBroker<GobblinScopeTypes> jobBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ // jobState.setBroker(jobBroker);
+ // jobState.setWorkUnitAndDatasetStateFunctional(new
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
+
+ try (Closer closer = Closer.create()) {
+ // before embarking on (potentially expensive) WU creation, first
pre-check that the FS is available
+ FileSystem fs = JobStateUtils.openFileSystem(jobState);
+ fs.mkdirs(workDirRoot);
+
+ List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState,
eventSubmitterContext, closer);
+
+ JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
+ JobStateUtils.writeJobState(jobState, workDirRoot, fs);
+
+ return workUnits.size();
+ } catch (ReflectiveOperationException roe) {
+ String errMsg = "Unable to construct a source for generating workunits
for job " + jobState.getJobId();
+ log.error(errMsg, roe);
+ throw ApplicationFailure.newNonRetryableFailureWithCause(errMsg,
"Failure: new Source()", roe);
+ } catch (IOException ioe) {
+ String errMsg = "Failed to generate workunits for job " +
jobState.getJobId();
+ log.error(errMsg, ioe);
+ throw ApplicationFailure.newFailureWithCause(errMsg, "Failure:
generating/writing workunits", ioe);
+ } finally {
+ // TODO: implement Troubleshooter integration!
+ }
+ }
+
+ protected static List<WorkUnit> generateWorkUnitsForJobState(JobState
jobState, EventSubmitterContext eventSubmitterContext, Closer closer)
+ throws ReflectiveOperationException {
+ Source<?, ?> source = JobStateUtils.createSource(jobState);
+ WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
+ ? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
+ : new
BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
+
+ // TODO: report (timer) metrics for workunits creation
+ if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { //
indicates a problem getting the WUs
+ String errMsg = "Failure in getting work units for job " +
jobState.getJobId();
+ log.error(errMsg);
+ // TODO: decide whether a non-retryable failure is too severe... (in
most circumstances, it's likely what we want)
+ throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure:
Source.getWorkUnits()");
+ }
+
+ if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run:
entirely normal result (not a failure)
+ log.warn("No work units created for job " + jobState.getJobId());
+ return Lists.newArrayList();
+ }
+
+ // TODO: count total bytes for progress tracking!
+
+ boolean canCleanUp = canCleanStagingData(jobState);
+ DestinationDatasetHandlerService datasetHandlerService = closer.register(
+ new DestinationDatasetHandlerService(jobState, canCleanUp,
eventSubmitterContext.create()));
+ WorkUnitStream handledWorkUnitStream =
datasetHandlerService.executeHandlers(workUnitStream);
+
+ // initialize writer and converter(s)
+ // TODO: determine whether registration here is effective, or the
lifecycle of this activity is too brief (as is likely!)
+ closer.register(WriterInitializerFactory.newInstace(jobState,
handledWorkUnitStream)).initialize();
+ closer.register(ConverterInitializerFactory.newInstance(jobState,
handledWorkUnitStream)).initialize();
+
+ // update jobState before it gets serialized
+ long startTime = System.currentTimeMillis();
+ jobState.setStartTime(startTime);
+ jobState.setState(JobState.RunningState.RUNNING);
+
+ log.info("Starting job " + jobState.getJobId());
+ // TODO: report (timer) metrics for workunits preparation
+ WorkUnitStream preparedWorkUnitStream =
AbstractJobLauncher.prepareWorkUnits(handledWorkUnitStream, jobState);
+
+ // TODO:
gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS),
jobState);
+
+ // dump the work unit if tracking logs are enabled (post any
materialization for counting)
+ WorkUnitStream trackedWorkUnitStream =
AbstractJobLauncher.addWorkUnitTrackingPerConfig(preparedWorkUnitStream,
jobState, log);
+
+ return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
+ }
+
+ protected static boolean canCleanStagingData(JobState jobState) {
+ if
(DeliverySemantics.EXACTLY_ONCE.equals(DeliverySemantics.parse(jobState))) {
+ String errMsg = "DeliverySemantics.EXACTLY_ONCE NOT currently supported;
job " + jobState.getJobId();
+ log.error(errMsg);
+ throw ApplicationFailure.newNonRetryableFailure(errMsg, "Unsupported:
DeliverySemantics.EXACTLY_ONCE");
+ }
+ return true;
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
new file mode 100644
index 000000000..b08835aa6
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.launcher;
+
+import com.typesafe.config.Config;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.client.WorkflowOptions;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
+import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A {@link JobLauncher} for the initial triggering of a Temporal workflow
that generates {@link WorkUnit}s per an arbitrary
+ * {@link org.apache.gobblin.source.Source}; see: {@link
GenerateWorkUnitsWorkflow}
+ *
+ * <p>
+ * This class is instantiated by the {@link
GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job
submission to launch the Gobblin job.
+ * The actual task execution happens in the {@link
GobblinTemporalTaskRunner}, usually in a different process.
+ * </p>
+ */
+@Slf4j
+public class GenerateWorkUnitsJobLauncher extends GobblinTemporalJobLauncher {
+
+ public static final String WORKFLOW_ID_BASE = "GenerateWorkUnits";
+
+ public GenerateWorkUnitsJobLauncher(
+ Properties jobProps,
+ Path appWorkDir,
+ List<? extends Tag<?>> metadataTags,
+ ConcurrentHashMap<String, Boolean> runningMap,
+ EventBus eventBus
+ ) throws Exception {
+ super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
+ }
+
+ @Override
+ public void submitJob(List<WorkUnit> workunits) {
+ try {
+ WorkflowOptions options = WorkflowOptions.newBuilder()
+ .setTaskQueue(this.queueName)
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps)))
+ .build();
+ GenerateWorkUnitsWorkflow workflow =
this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);
+
+ Config jobConfigWithOverrides =
applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));
+
+ Help.propagateGaaSFlowExecutionContext(this.jobProps);
+ EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext(this.eventSubmitter);
+
+ int numWorkUnits =
workflow.generate(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
+ log.info("FINISHED - GenerateWorkUnitsWorkflow.generate = {}",
numWorkUnits);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index ee6fe7f80..006572dc0 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -98,7 +98,7 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
- .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
.build();
ProcessWorkUnitsWorkflow workflow =
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
workflow.process(wuSpec);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index 87d91e0c4..ae6a8b711 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -17,13 +17,17 @@
package org.apache.gobblin.temporal.ddm.util;
-import java.util.Properties;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.io.Closer;
import com.typesafe.config.ConfigFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -36,24 +40,53 @@ import
org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SourceDecorator;
import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.source.Source;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.ParallelRunner;
/**
* Utilities for applying {@link JobState} info to various ends:
+ * - opening the {@link FileSystem}
+ * - creating the {@link Source}
* - creating a {@link SharedResourcesBroker}
- * - obtaining a {@link StateStore<TaskState>}
+ * - opening the {@link StateStore<TaskState>}
+ * - writing serialized {@link WorkUnit}s to the {@link FileSystem}
+ * - writing serialized {@link JobState} to the {@link FileSystem}
*/
@Slf4j
public class JobStateUtils {
- private static final String OUTPUT_DIR_NAME = "output"; // following
MRJobLauncher.OUTPUT_DIR_NAME
+ public static final String INPUT_DIR_NAME = "input"; // following
MRJobLauncher.INPUT_DIR_NAME
+ public static final String OUTPUT_DIR_NAME = "output"; // following
MRJobLauncher.OUTPUT_DIR_NAME
+ public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true;
// reuse same handle among activities executed by the same worker
private static final transient Cache<Path, StateStore<TaskState>>
taskStateStoreByPath = CacheBuilder.newBuilder().build();
private JobStateUtils() {}
+ /** @return the {@link FileSystem} indicated by {@link
ConfigurationKeys#FS_URI_KEY} */
+ public static FileSystem openFileSystem(JobState jobState) throws
IOException {
+ URI fsUri = URI.create(jobState.getProp(ConfigurationKeys.FS_URI_KEY,
ConfigurationKeys.LOCAL_FS_URI));
+ return Help.loadFileSystemForUriForce(fsUri, jobState);
+ }
+
+ /** @return a new instance of {@link Source} identified by {@link
ConfigurationKeys#SOURCE_CLASS_KEY} */
+ public static Source<?, ?> createSource(JobState jobState) throws
ReflectiveOperationException {
+ Class<?> sourceClass =
Class.forName(jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY));
+ log.info("Creating source: '{}'", sourceClass.getName());
+ Source<?, ?> source = new SourceDecorator<>(
+ Source.class.cast(sourceClass.newInstance()),
+ jobState.getJobId(), log);
+ return source;
+ }
+
public static StateStore<TaskState> openTaskStateStore(JobState jobState,
FileSystem fs) {
try {
Path taskStateStorePath = JobStateUtils.getTaskStateStorePath(jobState,
fs);
@@ -71,23 +104,69 @@ public class JobStateUtils {
return new FsStateStore<>(fs, taskStateStorePath.toUri().getPath(),
TaskState.class);
}
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return "base" dir root path for work dir (parent of inputs, output task
states, etc.)
+ */
+ public static Path getWorkDirRoot(JobState jobState) {
+ return new Path(
+ new Path(jobState.getProp(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
jobState.getJobName()),
+ jobState.getJobId());
+ }
+
/**
* ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
* @return path to {@link FsStateStore<TaskState>} backing dir
*/
public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) {
- Properties jobProps = jobState.getProperties();
- Path jobOutputPath = new Path(
- new Path(
- new Path(
- jobProps.getProperty(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY),
- JobState.getJobNameFromProps(jobProps)),
- JobState.getJobIdFromProps(jobProps)),
- OUTPUT_DIR_NAME);
+ Path jobOutputPath = new Path(getWorkDirRoot(jobState), OUTPUT_DIR_NAME);
return fs.makeQualified(jobOutputPath);
}
+ /** write serialized {@link WorkUnit}s in parallel into files named after
the jobID and task IDs */
+ public static void writeWorkUnits(List<WorkUnit> workUnits, Path
workDirRootPath, JobState jobState, FileSystem fs)
+ throws IOException {
+ String jobId = jobState.getJobId();
+ Path targetDirPath = new Path(workDirRootPath, INPUT_DIR_NAME);
+
+ int numThreads =
ParallelRunner.getNumThreadsConfig(jobState.getProperties());
+ Closer closer = Closer.create(); // (NOTE: try-with-resources syntax
wouldn't allow `catch { closer.rethrow(t) }`)
+ try {
+ ParallelRunner parallelRunner = closer.register(new
ParallelRunner(numThreads, fs));
+
+ JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new
JobLauncherUtils.WorkUnitPathCalculator();
+ int i = 0;
+ for (WorkUnit workUnit : workUnits) {
+ Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId,
targetDirPath);
+ if (i++ == 0) {
+ log.info("Writing work unit file [first of {}]: '{}'",
workUnits.size(), workUnitFile);
+ }
+ parallelRunner.serializeToFile(workUnit, workUnitFile);
+ }
+ } catch (Throwable t) {
+ throw closer.rethrow(t);
+ } finally {
+ closer.close();
+ }
+ }
+
+ /** write serialized `jobState` beneath `workDirRootPath` of `fs`, per
{@link JobStateUtils#DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES} */
+ public static void writeJobState(JobState jobState, Path workDirRootPath,
FileSystem fs) throws IOException {
+ writeJobState(jobState, workDirRootPath, fs,
DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES);
+ }
+
+ /** write serialized `jobState` beneath `workDirRootPath` of `fs`, per
whether to `writePreviousWorkUnitStates` */
+ public static void writeJobState(JobState jobState, Path workDirRootPath,
FileSystem fs, boolean writePreviousWorkUnitStates) throws IOException {
+ Path targetPath = new Path(workDirRootPath,
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+ try (DataOutputStream dataOutputStream = new
DataOutputStream(fs.create(targetPath))) {
+ log.info("Writing serialized jobState to '{}'", targetPath);
+ jobState.write(dataOutputStream, false, writePreviousWorkUnitStates);
+ log.info("Finished writing jobState to '{}'", targetPath);
+ }
+ }
+
public static SharedResourcesBroker<GobblinScopeTypes>
getSharedResourcesBroker(JobState jobState) {
SharedResourcesBroker<GobblinScopeTypes> globalBroker =
SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index dd47f590f..09da984a6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.temporal.ddm.work.assistance;
import java.io.IOException;
import java.net.URI;
import java.util.Optional;
+import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutionException;
@@ -31,6 +32,8 @@ import com.google.common.cache.CacheBuilder;
import com.typesafe.config.Config;
+import org.slf4j.MDC;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,9 +47,9 @@ import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.SerializationUtils;
-import org.slf4j.MDC;
/** Various capabilities useful in implementing Distributed Data Movement
(DDM) */
@@ -63,22 +66,35 @@ public class Help {
private Help() {}
- public static String qualifyNamePerExec(String name, FileSystemJobStateful
f, Config workerConfig) {
- return name + "_" + calcPerExecQualifier(f, workerConfig);
+ /** @return execution-specific name, incorporating any {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from {@link JobState} */
+ public static String qualifyNamePerExecWithFlowExecId(String name,
FileSystemJobStateful f, Config workerConfig) {
+ return name + "_" + calcPerExecQualifierWithOptFlowExecId(f, workerConfig);
}
- public static String qualifyNamePerExec(String name, Config workerConfig) {
+ /** @return execution-specific name, NOT incorporating {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} */
+ public static String qualifyNamePerExecWithoutFlowExecId(String name, Config
workerConfig) {
return name + "_" + calcPerExecQualifier(workerConfig);
}
- public static String calcPerExecQualifier(FileSystemJobStateful f, Config
workerConfig) {
+ /** @return execution-specific name, incorporating any {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
+ public static String qualifyNamePerExecWithFlowExecId(String name, Config
workerConfig) {
+ Optional<String> optFlowExecId =
Optional.ofNullable(ConfigUtils.getString(workerConfig,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+ return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId,
workerConfig);
+ }
+
+ public static String calcPerExecQualifierWithOptFlowExecId(Optional<String>
optFlowExecId, Config workerConfig) {
+ return optFlowExecId.map(x -> x + "_").orElse("") +
calcPerExecQualifier(workerConfig);
+ }
+
+ public static String
calcPerExecQualifierWithOptFlowExecId(FileSystemJobStateful f, Config
workerConfig) {
Optional<String> optFlowExecId = Optional.empty();
try {
+ // TODO: determine whether the same could be obtained from
`workerConfig` (likely much more efficiently)
optFlowExecId =
Optional.of(loadJobState(f).getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
null));
} catch (IOException e) {
log.warn("unable to loadJobState", e);
}
- return optFlowExecId.map(x -> x + "_").orElse("") +
calcPerExecQualifier(workerConfig);
+ return calcPerExecQualifierWithOptFlowExecId(optFlowExecId, workerConfig);
}
public static String calcPerExecQualifier(Config workerConfig) {
@@ -210,9 +226,25 @@ public class Help {
}
public static void propagateGaaSFlowExecutionContext(JobState jobState) {
+ doGaaSFlowExecutionContextPropagation(
+ jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>"),
+ jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>"),
+ jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT
SET>>")
+ );
+ }
+
+ public static void propagateGaaSFlowExecutionContext(Properties jobProps) {
+ doGaaSFlowExecutionContextPropagation(
+ jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>"),
+ jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>"),
+ jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT
SET>>")
+ );
+ }
+
+ protected static void doGaaSFlowExecutionContextPropagation(String
flowGroup, String flowName, String flowExecId) {
// TODO: log4j2 has better syntax around conditional logging such that the
key does not need to be included in the value
- MDC.put(ConfigurationKeys.FLOW_NAME_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY,
jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>")));
- MDC.put(ConfigurationKeys.FLOW_GROUP_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_GROUP_KEY,
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>")));
- MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT SET>>")));
+ MDC.put(ConfigurationKeys.FLOW_GROUP_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_GROUP_KEY, flowGroup));
+ MDC.put(ConfigurationKeys.FLOW_NAME_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY, flowName));
+ MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecId));
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 99c4d0536..02631719c 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -25,8 +25,10 @@ import io.temporal.worker.WorkerOptions;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
+import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
+import
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
@@ -43,12 +45,12 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
@Override
protected Class<?>[] getWorkflowImplClasses() {
- return new Class[] { ProcessWorkUnitsWorkflowImpl.class,
NestingExecOfProcessWorkUnitWorkflowImpl.class, CommitStepWorkflowImpl.class };
+ return new Class[] { CommitStepWorkflowImpl.class,
GenerateWorkUnitsWorkflowImpl.class,
NestingExecOfProcessWorkUnitWorkflowImpl.class,
ProcessWorkUnitsWorkflowImpl.class };
}
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new ProcessWorkUnitImpl(), new
CommitActivityImpl(), new SubmitGTEActivityImpl() };
+ return new Object[] { new CommitActivityImpl(), new
GenerateWorkUnitsImpl(), new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()
};
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
new file mode 100644
index 000000000..7c843e1af
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/GenerateWorkUnitsWorkflow.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow;
+
+import java.util.Properties;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+/** Workflow simply to generate {@link WorkUnit}s from a {@link
org.apache.gobblin.source.Source} (and persist them for subsequent processing)
*/
+@WorkflowInterface
+public interface GenerateWorkUnitsWorkflow {
+ /** @return the number of {@link WorkUnit}s generated and persisted */
+ @WorkflowMethod
+ int generate(Properties props, EventSubmitterContext eventSubmitterContext);
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
new file mode 100644
index 000000000..5fff3fe9b
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/GenerateWorkUnitsWorkflowImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Workflow;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+
+
+@Slf4j
+public class GenerateWorkUnitsWorkflowImpl implements
GenerateWorkUnitsWorkflow {
+ public static final Duration startToCloseTimeout = Duration.ofMinutes(90);
// TODO: make configurable
+
+ private static final RetryOptions ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(startToCloseTimeout)
+ .setRetryOptions(ACTIVITY_RETRY_OPTS)
+ .build();
+
+ private final GenerateWorkUnits activityStub =
Workflow.newActivityStub(GenerateWorkUnits.class, ACTIVITY_OPTS);
+
+ @Override
+ public int generate(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
+ return activityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 531a018c7..679a39dae 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -117,7 +117,7 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f) {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
- .setWorkflowId(Help.qualifyNamePerExec(CHILD_WORKFLOW_ID_BASE, f,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
.build();
// TODO: to incorporate multiple different concrete `NestingExecWorkflow`
sub-workflows in the same super-workflow... shall we use queues?!?!?
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
@@ -126,7 +126,7 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
protected CommitStepWorkflow createCommitStepWorkflow() {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
- .setWorkflowId(Help.qualifyNamePerExec(COMMIT_STEP_WORKFLOW_ID_BASE,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
.build();
return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index c61d8c72a..12d8861ec 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -93,7 +93,7 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap, EventBus eventbus)
throws Exception {
super(jobProps, HelixUtils.initBaseEventTags(jobProps, metadataTags));
- log.debug("GobblinJobLauncher: jobProps {}, appWorkDir {}", jobProps,
appWorkDir);
+ log.debug("GobblinJobLauncher: appWorkDir {}; jobProps {}", appWorkDir,
jobProps);
this.runningMap = runningMap;
this.appWorkDir = appWorkDir;
this.inputWorkUnitDir = new Path(appWorkDir,
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index 914b3aa90..82aeb8b20 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -21,16 +21,18 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-
import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
@@ -38,6 +40,8 @@ import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.*;
import static
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory.createClientInstance;
@@ -70,7 +74,7 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
List<? extends Tag<?>> metadataTags,
ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
throws Exception {
super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
- log.debug("GobblinTemporalJobLauncher: jobProps {}, appWorkDir {}",
jobProps, appWorkDir);
+ log.info("GobblinTemporalJobLauncher: appWorkDir {}; jobProps {}",
appWorkDir, jobProps);
String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
this.workflowServiceStubs = createServiceInstance(connectionUri);
@@ -83,6 +87,14 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
startCancellationExecutor();
}
+ /** @return {@link Config} now featuring all overrides rooted at {@link
GobblinTemporalConfigurationKeys#GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES}
*/
+ protected Config applyJobLauncherOverrides(Config config) {
+ Config configOverrides = ConfigUtils.getConfig(config,
+
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES,
ConfigFactory.empty());
+ log.info("appying config overrides: {}", configOverrides);
+ return configOverrides.withFallback(config);
+ }
+
@Override
protected void handleLaunchFinalization() {
// NOTE: This code only makes sense when there is 1 source / workflow
being launched per application for Temporal. This is a stop-gap
@@ -104,12 +116,15 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
log.info("Cancel temporal workflow");
}
+ /** No-op: merely logs a warning, since not expected to be invoked */
@Override
protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) {
- log.info("Temporal removeTasksFromCurrentJob");
+ log.warn("NOT IMPLEMENTED: Temporal removeTasksFromCurrentJob");
}
+ /** No-op: merely logs a warning, since not expected to be invoked */
+ @Override
protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
- log.info("Temporal addTasksToCurrentJob");
+ log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
}
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 6439f41b2..f70f11f75 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -26,8 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-
-import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +40,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
-import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -65,19 +63,13 @@ public class JobLauncherUtils {
private static Cache<String, FileSystem> fileSystemCacheByOwners =
CacheBuilder.newBuilder().build();
/** Calculate monotonically-increasing paths for multi-WU files */
- @AllArgsConstructor
- @NotThreadSafe
public static class WorkUnitPathCalculator {
- private int nextMultiWorkUnitTaskId;
-
- public WorkUnitPathCalculator() {
- this(0);
- }
+ private final AtomicInteger nextMultiWorkUnitTaskId = new AtomicInteger(0);
// Serialize each work unit into a file named after the task ID
public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
String workUnitFileName = workUnit.isMultiWorkUnit()
- ? JobLauncherUtils.newMultiTaskId(jobId, nextMultiWorkUnitTaskId++)
+ JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+ ? JobLauncherUtils.newMultiTaskId(jobId,
nextMultiWorkUnitTaskId.getAndIncrement()) +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
: workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
return new Path(basePath, workUnitFileName);
}