[ 
https://issues.apache.org/jira/browse/GOBBLIN-2153?focusedWorklogId=934379&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934379
 ]

ASF GitHub Bot logged work on GOBBLIN-2153:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Sep/24 01:24
            Start Date: 12/Sep/24 01:24
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #4052:
URL: https://github.com/apache/gobblin/pull/4052#discussion_r1755945665


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java:
##########
@@ -0,0 +1,52 @@
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+
+
+/**
+ * Utility class for handling Temporal workflow-related operations.
+ */
+@UtilityClass
+public class TemporalWorkFlowUtils {
+
+  /**
+   * Generates search attributes for a WorkFlow  based on the provided GAAS 
job properties.
+   *
+   * @param jobProps the properties of the job, must not be null.
+   * @return a map containing the generated search attributes.
+   */
+  public static Map<String, Object> generateGaasSearchAttributes(@NonNull 
Properties jobProps) {
+    Map<String, Object> attributes = new HashMap<>();
+    attributes.put(Help.GAAS_FLOW_KEY, String.format("%s.%s", 
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY),

Review Comment:
   we should call this GAAS_FLOW_ID_SEARCH_KEY, as flowGroup + flowName is the 
UUID for a given flow config in GaaS



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -57,21 +59,25 @@ public CommitStats process(WUProcessingSpec workSpec) {
   }
 
   private CommitStats performWork(WUProcessingSpec workSpec) {
-    Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
-    NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
-    int workunitsProcessed = processingWorkflow.performWorkload(
-        WorkflowAddr.ROOT, workload, 0,
-        workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()
-    );
-    if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
-      CommitStats result = commitWorkflow.commit(workSpec);
-      if (result.getNumCommittedWorkUnits() == 0) {
-        log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+    try {
+      Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
+      JobState jobState = Help.loadJobState(workSpec, 
Help.loadFileSystem(workSpec));
+      NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, jobState);
+      int workunitsProcessed = 
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+          workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
+      if (workunitsProcessed > 0) {
+        CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(jobState);
+        CommitStats result = commitWorkflow.commit(workSpec);
+        if (result.getNumCommittedWorkUnits() == 0) {
+          log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
+        }
+        return result;
+      } else {
+        log.error("No work units processed, so no commit attempted.");
+        return CommitStats.createEmpty();
       }
-      return result;
-    } else {
-      log.error("No work units processed, so no commit attempted.");
+    } catch (Exception ignored) {
+      log.error("Exception occured during performing Work", ignored);
       return CommitStats.createEmpty();

Review Comment:
   What's the rationale for this try catch? Seems dangerous to ignore 
exceptions during processworkunits step, shouldn't we want it to fail loudly?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 934379)
    Time Spent: 50m  (was: 40m)

> Add SearchAttributes to filter Temporal Flows in the UI
> -------------------------------------------------------
>
>                 Key: GOBBLIN-2153
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2153
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Aditya Pratap Singh
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Add SearchAttributes to filter Temporal Flows in the UI



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to