[ https://issues.apache.org/jira/browse/GOBBLIN-2153?focusedWorklogId=934586&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934586 ]
ASF GitHub Bot logged work on GOBBLIN-2153: ------------------------------------------- Author: ASF GitHub Bot Created on: 12/Sep/24 18:23 Start Date: 12/Sep/24 18:23 Worklog Time Spent: 10m Work Description: Will-Lo commented on code in PR #4052: URL: https://github.com/apache/gobblin/pull/4052#discussion_r1757366881 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -57,21 +59,25 @@ public CommitStats process(WUProcessingSpec workSpec) { } private CommitStats performWork(WUProcessingSpec workSpec) { - Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec); - NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec); - int workunitsProcessed = processingWorkflow.performWorkload( - WorkflowAddr.ROOT, workload, 0, - workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty() - ); - if (workunitsProcessed > 0) { - CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(); - CommitStats result = commitWorkflow.commit(workSpec); - if (result.getNumCommittedWorkUnits() == 0) { - log.warn("No work units committed at the job level. They could have been committed at the task level."); + try { + Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec); + JobState jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec)); + NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, jobState); + int workunitsProcessed = processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()); + if (workunitsProcessed > 0) { + CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(jobState); + CommitStats result = commitWorkflow.commit(workSpec); + if (result.getNumCommittedWorkUnits() == 0) { + log.warn("No work units committed at the job level. They could have been committed at the task level."); + } + return result; + } else { + log.error("No work units processed, so no commit attempted."); + return CommitStats.createEmpty(); } - return result; - } else { - log.error("No work units processed, so no commit attempted."); + } catch (Exception ignored) { + log.error("Exception occured during performing Work", ignored); return CommitStats.createEmpty(); Review Comment: If it throws an exception when loading the jobState (which I presume would lead to an empty jobState) we should just throw an ApplicationException in this scenario, so that we can push the Temporal framework to retry the workflow rather than progressing silently. Issue Time Tracking ------------------- Worklog Id: (was: 934586) Time Spent: 2h 10m (was: 2h) > Add SearchAttributes to filter Temporal Flows in the UI > ------------------------------------------------------- > > Key: GOBBLIN-2153 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2153 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Aditya Pratap Singh > Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Add SearchAttributes to filter Temporal Flows in the UI -- This message was sent by Atlassian Jira (v8.20.10#820010)