Will-Lo commented on code in PR #4052:
URL: https://github.com/apache/gobblin/pull/4052#discussion_r1755945665


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java:
##########
@@ -0,0 +1,52 @@
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+
+
+/**
+ * Utility class for handling Temporal workflow-related operations.
+ */
+@UtilityClass
+public class TemporalWorkFlowUtils {
+
+  /**
+   * Generates search attributes for a WorkFlow  based on the provided GAAS 
job properties.
+   *
+   * @param jobProps the properties of the job, must not be null.
+   * @return a map containing the generated search attributes.
+   */
+  public static Map<String, Object> generateGaasSearchAttributes(@NonNull 
Properties jobProps) {
+    Map<String, Object> attributes = new HashMap<>();
+    attributes.put(Help.GAAS_FLOW_KEY, String.format("%s.%s", 
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY),

Review Comment:
   we should call this GAAS_FLOW_ID_SEARCH_KEY, as flowGroup + flowName is the 
UUID for a given flow config in GaaS



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -57,21 +59,25 @@ public CommitStats process(WUProcessingSpec workSpec) {
   }
 
   private CommitStats performWork(WUProcessingSpec workSpec) {
-    Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
-    NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
-    int workunitsProcessed = processingWorkflow.performWorkload(
-        WorkflowAddr.ROOT, workload, 0,
-        workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
-    );
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+    try {
+      Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, 
Help.loadFileSystem(workSpec));
+      NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, jobState);
+      int workunitsProcessed = 
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
+      if (workunitsProcessed > 0) {
+        CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(jobState);
+        CommitStats result = commitWorkflow.commit(workSpec);
+        if (result.getNumCommittedWorkUnits() == 0) {
+          log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+        }
+        return result;
+      } else {
+        log.error("No work units processed, so no commit attempted.");
+        return CommitStats.createEmpty();
       }
-      return result;
-    } else {
-      log.error("No work units processed, so no commit attempted.");
+    } catch (Exception ignored) {
+      log.error("Exception occured during performing Work", ignored);
       return CommitStats.createEmpty();

Review Comment:
   What's the rationale for this try catch? Seems dangerous to ignore 
exceptions during processworkunits step, shouldn't we want it to fail loudly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to