Will-Lo commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1401122843


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java:
##########
@@ -193,39 +189,11 @@ protected void shutDown() throws Exception {
    * @throws IOException if it fails to collect the output {@link TaskState}s
    */
   private void collectOutputTaskStates() throws IOException {
-    List<String> taskStateNames = 
taskStateStore.getTableNames(outputTaskStateDir.getName(), new 
Predicate<String>() {
-      @Override
-      public boolean apply(String input) {
-        return 
input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX)
-        && !input.startsWith(FsStateStore.TMP_FILE_PREFIX);
-      }});
 
-    if (taskStateNames == null || taskStateNames.size() == 0) {
-      LOGGER.debug("No output task state files found in " + 
this.outputTaskStateDir);
+    final Queue<TaskState> taskStateQueue = 
deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, 
this.stateSerDeRunnerThreads);
+    if (taskStateQueue == null) {
       return;
     }
-
-    final Queue<TaskState> taskStateQueue = Queues.newConcurrentLinkedQueue();
-    try (ParallelRunner stateSerDeRunner = new 
ParallelRunner(this.stateSerDeRunnerThreads, null)) {
-      for (final String taskStateName : taskStateNames) {
-        LOGGER.debug("Found output task state file " + taskStateName);
-        // Deserialize the TaskState and delete the file
-        stateSerDeRunner.submitCallable(new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            TaskState taskState = 
taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0);
-            taskStateQueue.add(taskState);
-            taskStateStore.delete(outputTaskStateDir.getName(), taskStateName);

Review Comment:
   Oh I think I had to delete this line so that I wouldn't have to regenerate 
the workunits each time, but good catch need to add it back in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to