[ 
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=895533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-895533
 ]

ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Dec/23 19:29
            Start Date: 13/Dec/23 19:29
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1425786741


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -209,36 +209,36 @@ private void collectOutputTaskStates() throws IOException 
{
     // Finish any additional steps defined in handler on driver level.
     // Currently implemented handler for Hive registration only.
     if (optionalTaskCollectorHandler.isPresent()) {
-      log.info("Execute Pipelined TaskStateCollectorService Handler for " + 
taskStateQueue.size() + " tasks");
+      log.info("Execute Pipelined TaskStateCollectorService Handler for " + 
taskStateQueue.get().size() + " tasks");
 
       try {
-        optionalTaskCollectorHandler.get().handle(taskStateQueue);
+        optionalTaskCollectorHandler.get().handle(taskStateQueue.get());
       } catch (Throwable t) {
         if (isJobProceedOnCollectorServiceFailure) {
           log.error("Failed to commit dataset while job proceeds", t);
-          SafeDatasetCommit.setTaskFailureException(taskStateQueue, t);
+          SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t);
         } else {
           throw new RuntimeException("Hive Registration as the 
TaskStateCollectorServiceHandler failed.", t);
         }
       }
     }
 
     // Notify the listeners for the completion of the tasks
-    this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
+    this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get())));
   }
 
   /**
-   * Reads in a {@link FsStateStore} folder used to store Task state outputs, 
and returns a queue of {@link TaskState}s
+   * Reads in a @{@link StateStore} and deserializes all task states found in 
the provided table name

Review Comment:
   typo: `@{@link StateStore}`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -190,13 +190,13 @@ protected void shutDown() throws Exception {
    */
   private void collectOutputTaskStates() throws IOException {
 
-    final Queue<TaskState> taskStateQueue = 
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, 
this.stateSerDeRunnerThreads);
-    if (taskStateQueue == null) {
+    final Optional<Queue<TaskState>> taskStateQueue = 
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(), 
this.stateSerDeRunnerThreads);
+    if (!taskStateQueue.isPresent()) {
       return;
     }
     // Add the TaskStates of completed tasks to the JobState so when the 
control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
-    for (TaskState taskState : taskStateQueue) {
+    for (TaskState taskState : taskStateQueue.get()) {

Review Comment:
   nit: the farther down we get, the harder to keep track of whether these 
`.get()` calls are truly safe (i.e. previously checked).  the common convention 
to establish safety looks like:
   ```
   if (!optTSQ.isPresent()) {
     return;
   } // could be `else`, if you don't mind indentation
   
   Queue<TaskState> tsq = optTSQ.get();
   // (henceforth, no more mention of `optTSQ`)
   ...
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -54,9 +54,14 @@ public int process(WUProcessingSpec workSpec) {
     if (workunitsProcessed > 0) {
       CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
       int result = commitWorkflow.commit(workSpec);
+      if (result == 0) {
+        log.warn("No work units committed at the job level. They could be 
committed at a task level.");
+      }
       return result;
+    } else {
+      log.error("No workunits processed, so no commit will be attempted.");

Review Comment:
   nit: strike out -will be- or -will be attempted-, since past tense



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -75,11 +74,14 @@ public int commit(WUProcessingSpec workSpec) {
       Path jobOutputPath = new Path(new Path(jobIdParent, "output"), 
jobIdParent.getName());
       log.info("Output path at: " + jobOutputPath + " with fs at " + 
fs.getUri());
       StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, 
fs);
-      Collection<TaskState> taskStateQueue =
-          ImmutableList.copyOf(
-              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath, numDeserializationThreads));
-      commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
-      return taskStateQueue.size();
+      Optional<Queue<TaskState>> taskStateQueue =
+              
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, 
jobOutputPath.getName(), numDeserializationThreads);
+      if (!taskStateQueue.isPresent()) {
+        log.error("No task states found at " + jobOutputPath);
+        return 0;
+      }
+      commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue.get()), 
globalGobblinContext);

Review Comment:
   same comment as above w/ `.get()` only once, then no further use of the 
`Optional`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 895533)
    Time Spent: 3h 10m  (was: 3h)

> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
>                 Key: GOBBLIN-1968
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to 
> replace MapReduce for many of its data movement workflows. We want to 
> integrate Gobblin's commit step that tracks task states and reports their 
> status with Temporal



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to