[
https://issues.apache.org/jira/browse/GOBBLIN-2054?focusedWorklogId=916277&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-916277
]
ASF GitHub Bot logged work on GOBBLIN-2054:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 24/Apr/24 22:56
Start Date: 24/Apr/24 22:56
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3934:
URL: https://github.com/apache/gobblin/pull/3934#discussion_r1578603726
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this
is retryable to throw a non-retryable failure exception
- String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
+ String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
throw new IOException(exc);
}
}
+ /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>}
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+ private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem
fs, JobState jobState, int numThreads) throws IOException {
+ // TODO - decide whether to replace this method by adapting
TaskStateCollectorService::collectOutputTaskStates (whence much of this code
was drawn)
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ // NOTE: TaskState dir is assumed to be a sibling to the workunits dir
(following conventions of `MRJobLauncher`)
+ String jobIdPathName = new
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+ log.info("TaskStateStore path (name component): '{}' (fs: '{}')",
jobIdPathName, fs.getUri());
+ Optional<Queue<TaskState>> taskStateQueueOpt =
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobIdPathName, numThreads);
+ return taskStateQueueOpt.map(taskStateQueue ->
+ taskStateQueue.stream().peek(taskState ->
+ // CRITICAL: although some `WorkUnit`s, like those created by
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+ // already themselves contain every prop of their `JobState`,
not all do.
+ // `TaskState extends WorkUnit` serialization will include its
constituent `WorkUnit`, but not the constituent `JobState`.
+ // given some `JobState` props may be essential for
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+ taskState.setJobState(jobState)
+ // TODO - decide whether something akin necessary to
streamline cumulative in-memory size of all issues:
consumeTaskIssues(taskState);
+ ).collect(Collectors.toList())
+ ).orElseGet(() -> {
+ log.error("TaskStateStore successfully opened, but no task states found
under (name) '{}'", jobIdPathName);
+ return Lists.newArrayList();
+ });
+ }
+
/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
}
-
- /**
- * Organize task states by dataset urns.
- * @param taskStates
- * @return A map of dataset urns to dataset task states.
- */
- public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
- Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-
- //TODO: handle skipped tasks?
- for (TaskState taskState : taskStates) {
- String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
- datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
- datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
- }
-
- return datasetStatesByUrns;
- }
-
- private static String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
- String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY,
ConfigurationKeys.DEFAULT_DATASET_URN);
- if (!datasetStatesByUrns.containsKey(datasetUrn)) {
- JobState.DatasetState datasetState = new JobState.DatasetState();
Review Comment:
this no-arg ctor should possibly only be used for serialization.
w/o a `jobName`, the following happens later while persisting dataset states
at the end of commit:
```
Caused by: java.lang.IllegalArgumentException: Can not create a Path from a
null string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159)
at org.apache.hadoop.fs.Path.<init>(Path.java:175)
at org.apache.hadoop.fs.Path.<init>(Path.java:110)
at
org.apache.gobblin.runtime.FsDatasetStateStore.sanitizeDatasetStatestoreNameFromDatasetURN(FsDatasetStateStore.java:175)
at
org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:386)
at
org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:90)
at
org.apache.gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:418)
at
org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:191)
... 8 more
```
iceberg-distcp succeeded only because it happened to have this set:
```
state.store.enabled=false
```
and thus did not attempt to persist dataset state
([gobblin-on-MR](https://github.com/apache/gobblin/blob/a74d17a0123218ac4c867caeefaee2f472b438e7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java#L725)
solution we now leverage)
Issue Time Tracking
-------------------
Worklog Id: (was: 916277)
Time Spent: 40m (was: 0.5h)
> `CommitActivityImpl` fails for job types (sources) other than Iceberg-Distcp
> ----------------------------------------------------------------------------
>
> Key: GOBBLIN-2054
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2054
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> gobblin-on-temporal execution has been failing for other job types than
> iceberg-distcp (which uses `CopySource`). in particular Commit fails with:
> {code}
> java.lang.IllegalArgumentException: Missing required property
> writer.output.dir
> at
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.gobblin.util.WriterUtils.getWriterOutputDir(WriterUtils.java:121)
> at
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:390)
> at
> org.apache.gobblin.publisher.BaseDataPublisher.publishMultiTaskData(BaseDataPublisher.java:379)
> at
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:366)
> at
> org.apache.gobblin.publisher.DataPublisher.publish(DataPublisher.java:81)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.commitDataset(SafeDatasetCommit.java:260)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:168)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:64)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> this is odd because that same prop had already been used prior to commit,
> while processing the `WorkUnit`! moreover logging shows it to be present
> within the `JobState`
> anyway, even when using a private build that hard-coded that property, this
> later error arises:
> {code}
> Caused by: java.lang.IllegalArgumentException: Can not create a Path from a
> null string
> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159)
> at org.apache.hadoop.fs.Path.<init>(Path.java:175)
> at org.apache.hadoop.fs.Path.<init>(Path.java:110)
> at
> org.apache.gobblin.runtime.FsDatasetStateStore.sanitizeDatasetStatestoreNameFromDatasetURN(FsDatasetStateStore.java:175)
> at
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:386)
> at
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:90)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:418)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:191)
> ... 8 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)