[
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=895533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-895533
]
ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Dec/23 19:29
Start Date: 13/Dec/23 19:29
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1425786741
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -209,36 +209,36 @@ private void collectOutputTaskStates() throws IOException
{
// Finish any additional steps defined in handler on driver level.
// Currently implemented handler for Hive registration only.
if (optionalTaskCollectorHandler.isPresent()) {
- log.info("Execute Pipelined TaskStateCollectorService Handler for " +
taskStateQueue.size() + " tasks");
+ log.info("Execute Pipelined TaskStateCollectorService Handler for " +
taskStateQueue.get().size() + " tasks");
try {
- optionalTaskCollectorHandler.get().handle(taskStateQueue);
+ optionalTaskCollectorHandler.get().handle(taskStateQueue.get());
} catch (Throwable t) {
if (isJobProceedOnCollectorServiceFailure) {
log.error("Failed to commit dataset while job proceeds", t);
- SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
+ SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t);
} else {
throw new RuntimeException("Hive Registration as the
TaskStateCollectorServiceHandler failed.", t);
}
}
}
// Notify the listeners for the completion of the tasks
- this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
+ this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get())));
}
/**
- * Reads in a {@link FsStateStore} folder used to store Task state outputs,
and returns a queue of {@link TaskState}s
+ * Reads in a @{@link StateStore} and deserializes all task states found in
the provided table name
Review Comment:
typo: `@{@link StateStore}`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -190,13 +190,13 @@ protected void shutDown() throws Exception {
*/
private void collectOutputTaskStates() throws IOException {
- final Queue<TaskState> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir,
this.stateSerDeRunnerThreads);
- if (taskStateQueue == null) {
+ final Optional<Queue<TaskState>> taskStateQueue =
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(),
this.stateSerDeRunnerThreads);
+ if (!taskStateQueue.isPresent()) {
return;
}
// Add the TaskStates of completed tasks to the JobState so when the
control
// returns to the launcher, it sees the TaskStates of all completed tasks.
- for (TaskState taskState : taskStateQueue) {
+ for (TaskState taskState : taskStateQueue.get()) {
Review Comment:
nit: the farther down we get, the harder to keep track of whether these
`.get()` calls are truly safe (i.e. previously checked). the common convention
to establish safety looks like:
```
if (!optTSQ.isPresent()) {
return;
} // could be `else`, if you don't mind indentation
Queue<TaskState> tsq = optTSQ.get();
// (henceforth, no more mention of `optTSQ`)
...
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -54,9 +54,14 @@ public int process(WUProcessingSpec workSpec) {
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
int result = commitWorkflow.commit(workSpec);
+ if (result == 0) {
+ log.warn("No work units committed at the job level. They could be
committed at a task level.");
+ }
return result;
+ } else {
+ log.error("No workunits processed, so no commit will be attempted.");
Review Comment:
nit: strike out -will be- or -will be attempted-, since past tense
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -75,11 +74,14 @@ public int commit(WUProcessingSpec workSpec) {
Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
- Collection<TaskState> taskStateQueue =
- ImmutableList.copyOf(
-
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
- commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
- return taskStateQueue.size();
+ Optional<Queue<TaskState>> taskStateQueue =
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath.getName(), numDeserializationThreads);
+ if (!taskStateQueue.isPresent()) {
+ log.error("No task states found at " + jobOutputPath);
+ return 0;
+ }
+ commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue.get()),
globalGobblinContext);
Review Comment:
same comment as above w/ `.get()` only once, then no further use of the
`Optional`
Issue Time Tracking
-------------------
Worklog Id: (was: 895533)
Time Spent: 3h 10m (was: 3h)
> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
> Key: GOBBLIN-1968
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 3h 10m
> Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to
> replace MapReduce for many of its data movement workflows. We want to
> integrate Gobblin's commit step that tracks task states and reports their
> status with Temporal
--
This message was sent by Atlassian Jira
(v8.20.10#820010)