This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 77f9d3896 [GOBBLIN-2054] Fix `CommitActivityImpl` to succeed for
`TaskState`s that did not originate with `CopySource` (#3934)
77f9d3896 is described below
commit 77f9d3896c7b6249fca52e481d4f62bce0a06831
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Apr 25 08:28:02 2024 -0700
[GOBBLIN-2054] Fix `CommitActivityImpl` to succeed for `TaskState`s that
did not originate with `CopySource` (#3934)
---
.../org/apache/gobblin/runtime/JobContext.java | 3 +-
.../java/org/apache/gobblin/runtime/JobState.java | 9 +-
.../ddm/activity/impl/CommitActivityImpl.java | 114 ++++++++++-----------
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 2 +-
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 3 +-
5 files changed, 63 insertions(+), 68 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index a7b7fd0f5..7a8d42c09 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -544,8 +544,7 @@ public class JobContext implements Closeable {
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
*/
public static boolean shouldCommitDataInJob(State state) {
- boolean jobCommitPolicyIsFull =
- JobCommitPolicy.getCommitPolicy(state.getProperties()) ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+ boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state) ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel =
state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
boolean jobDataPublisherSpecified =
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index c58641e89..d61777f83 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -392,16 +392,21 @@ public class JobState extends SourceState implements
JobProgress {
* @return a {@link Map} from dataset URNs to {@link DatasetState}s
representing the dataset states
*/
public Map<String, DatasetState> createDatasetStatesByUrns() {
+ return calculateDatasetStatesByUrns(this.taskStates.values(),
this.skippedTaskStates.values());
+ }
+
+ /** {@see JobState#createDatasetStatesByUrns} */
+ public Map<String, DatasetState>
calculateDatasetStatesByUrns(Collection<TaskState> allTaskStates,
Collection<TaskState> allSkippedTaskStates) {
Map<String, DatasetState> datasetStatesByUrns = Maps.newHashMap();
- for (TaskState taskState : this.taskStates.values()) {
+ for (TaskState taskState : allTaskStates) {
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
}
- for (TaskState taskState : this.skippedTaskStates.values()) {
+ for (TaskState taskState : allSkippedTaskStates) {
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
datasetStatesByUrns.get(datasetUrn).addSkippedTaskState(taskState);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index f945af1d0..94b5420ee 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -17,14 +17,7 @@
package org.apache.gobblin.temporal.ddm.activity.impl;
-import com.google.common.base.Function;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import io.temporal.failure.ApplicationFailure;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -33,13 +26,24 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
+
+import com.google.api.client.util.Lists;
+import com.google.common.base.Function;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import io.temporal.failure.ApplicationFailure;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobContext;
@@ -57,8 +61,6 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
@Slf4j
@@ -72,37 +74,27 @@ public class CommitActivityImpl implements CommitActivity {
public int commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
- String jobName = UNDEFINED_JOB_NAME;
+ Optional<String> optJobName = Optional.empty();
AutomaticTroubleshooter troubleshooter = null;
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
- jobName = jobState.getJobName();
+ optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
- JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker,
troubleshooter.getIssueRepository());
- // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
- Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
- Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
- log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
- StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
- Optional<Queue<TaskState>> taskStateQueueOpt =
-
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath.getName(), numDeserializationThreads);
- if (!taskStateQueueOpt.isPresent()) {
- log.error("No task states found at " + jobOutputPath);
- return 0;
+ List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState,
numDeserializationThreads);
+ if (!taskStates.isEmpty()) {
+ JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, troubleshooter.getIssueRepository());
+ commitTaskStates(jobState, taskStates, jobContext);
}
- Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
- commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue),
globalGobblinContext);
- return taskStateQueue.size();
+ return taskStates.size();
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
- String.format("Failed to commit dataset state for some dataset(s) of
job %s", jobName),
+ String.format("Failed to commit dataset state for some dataset(s) of
job %s", optJobName.orElse(UNDEFINED_JOB_NAME)),
IOException.class.toString(),
- new IOException(e),
- null
+ new IOException(e)
);
} finally {
String errCorrelator = String.format("Commit [%s]",
calcCommitId(workSpec));
@@ -118,8 +110,13 @@ public class CommitActivityImpl implements CommitActivity {
* @param jobContext
* @throws IOException
*/
- private void commitTaskStates(State jobState, Collection<TaskState>
taskStates, JobContext jobContext) throws IOException {
- Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ private void commitTaskStates(JobState jobState, List<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ if (!taskStates.isEmpty()) {
+ TaskState firstTaskState = taskStates.get(0);
+ log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
+ }
+ //TODO: handle skipped tasks?
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
//TODO: Make this configurable
@@ -148,7 +145,7 @@ public class CommitActivityImpl implements CommitActivity {
}).iterator(), numCommitThreads,
// TODO: Rewrite executorUtils to use java util optional
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("Commit-thread-%d")))
- .executeAndGetResults();
+ .executeAndGetResults();
IteratorExecutor.logFailures(result, null, 10);
@@ -166,7 +163,7 @@ public class CommitActivityImpl implements CommitActivity {
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this
is retryable to throw a non-retryable failure exception
- String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
+ String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
@@ -174,36 +171,31 @@ public class CommitActivityImpl implements CommitActivity
{
}
}
+ /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>}
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+ private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem
fs, JobState jobState, int numThreads) throws IOException {
+ // TODO - decide whether to replace this method by adapting
TaskStateCollectorService::collectOutputTaskStates (whence much of this code
was drawn)
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ // NOTE: TaskState dir is assumed to be a sibling to the workunits dir
(following conventions of `MRJobLauncher`)
+ String jobIdPathName = new
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+ log.info("TaskStateStore path (name component): '{}' (fs: '{}')",
jobIdPathName, fs.getUri());
+ Optional<Queue<TaskState>> taskStateQueueOpt =
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobIdPathName, numThreads);
+ return taskStateQueueOpt.map(taskStateQueue ->
+ taskStateQueue.stream().peek(taskState ->
+ // CRITICAL: although some `WorkUnit`s, like those created by
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+ // already themselves contain every prop of their `JobState`,
not all do.
+ // `TaskState extends WorkUnit` serialization will include its
constituent `WorkUnit`, but not the constituent `JobState`.
+ // given some `JobState` props may be essential for
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+ taskState.setJobState(jobState)
+ // TODO - decide whether something akin necessary to
streamline cumulative in-memory size of all issues:
consumeTaskIssues(taskState);
+ ).collect(Collectors.toList())
+ ).orElseGet(() -> {
+ log.error("TaskStateStore successfully opened, but no task states found
under (name) '{}'", jobIdPathName);
+ return Lists.newArrayList();
+ });
+ }
+
/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
}
-
- /**
- * Organize task states by dataset urns.
- * @param taskStates
- * @return A map of dataset urns to dataset task states.
- */
- public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
- Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-
- //TODO: handle skipped tasks?
- for (TaskState taskState : taskStates) {
- String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
- datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
- datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
- }
-
- return datasetStatesByUrns;
- }
-
- private static String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
- String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY,
ConfigurationKeys.DEFAULT_DATASET_URN);
- if (!datasetStatesByUrns.containsKey(datasetUrn)) {
- JobState.DatasetState datasetState = new JobState.DatasetState();
- datasetState.setDatasetUrn(datasetUrn);
- datasetStatesByUrns.put(datasetUrn, datasetState);
- }
- return datasetUrn;
- }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index aff935dd8..f11ac7068 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -138,7 +138,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit
{
// TODO: if metrics configured, report them now
log.info("WU [{} = {}] - finished commit after {}ms with state {}{}",
wu.getCorrelator(), task.getTaskId(),
taskState.getTaskDuration(), taskState.getWorkingState(),
-
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
+
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL) &&
taskState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)
? (" to: " +
taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(),
task.getTaskId(),
taskState.toJsonString(shouldUseExtendedLogging(wu)));
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index f855b19a9..5da320de9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -104,8 +104,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
e.getClass().toString(),
- e,
- null
+ e
);
}
return numWUsGenerated;