[ 
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=893101&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893101
 ]

ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Nov/23 09:03
            Start Date: 30/Nov/23 09:03
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1410345417


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
    * @throws IOException if it fails to collect the output {@link TaskState}s
    */
   private void collectOutputTaskStates() throws IOException {
-    List<String> taskStateNames = 
taskStateStore.getTableNames(outputTaskStateDir.getName(), new 
Predicate<String>() {
-      @Override
-      public boolean apply(String input) {
-        return 
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
-        && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
-      }});
 
-    if (taskStateNames == null || taskStateNames.size() == 0) {
-      LOGGER.debug("No output task state files found in " + 
this.outputTaskStateDir);
+    final Queue<TaskState> taskStateQueue = 
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, 
this.stateSerDeRunnerThreads);
+    if (taskStateQueue == null) {

Review Comment:
   I'd lean heavily toward returning an empty queue rather than `null`
   
   if the issue is that the queue may be built asynchronously, such that 
".isEmpty()" at the start might not always be the case... if so, use `Optional`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
    * @throws IOException if it fails to collect the output {@link TaskState}s
    */
   private void collectOutputTaskStates() throws IOException {
-    List<String> taskStateNames = 
taskStateStore.getTableNames(outputTaskStateDir.getName(), new 
Predicate<String>() {
-      @Override
-      public boolean apply(String input) {
-        return 
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
-        && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
-      }});
 
-    if (taskStateNames == null || taskStateNames.size() == 0) {
-      LOGGER.debug("No output task state files found in " + 
this.outputTaskStateDir);
+    final Queue<TaskState> taskStateQueue = 
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, 
this.stateSerDeRunnerThreads);
+    if (taskStateQueue == null) {

Review Comment:
   I'd lean heavily toward returning an empty queue rather than `null`
   
   if the issue is that the queue may be built asynchronously, such that 
".isEmpty()" at the start might not always be the case... for that, use 
`Optional`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 893101)
    Time Spent: 50m  (was: 40m)

> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
>                 Key: GOBBLIN-1968
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to 
> replace MapReduce for many of its data movement workflows. We want to 
> integrate Gobblin's commit step that tracks task states and reports their 
> status with Temporal



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to