This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a8d230d3f [GOBBLIN-2153] Added search attributes to Temporal 
workflows (#4052)
8a8d230d3f is described below

commit 8a8d230d3f0a8d3e9a209330183d4189481f5e79
Author: pratapaditya04 <[email protected]>
AuthorDate: Wed Sep 18 00:44:04 2024 +0530

    [GOBBLIN-2153] Added search attributes to Temporal workflows (#4052)
    
    Added search attributes to Temporal workflows
    ---------
    
    Co-authored-by: Aditya Pratap Singh <[email protected]>
---
 .../ddm/launcher/ExecuteGobblinJobLauncher.java    |   2 +
 .../ddm/launcher/GenerateWorkUnitsJobLauncher.java |   2 +
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |   3 +
 .../temporal/ddm/util/TemporalWorkFlowUtils.java   |  70 +++++++++++++
 .../gobblin/temporal/ddm/work/assistance/Help.java |   2 +
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |   2 +
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  43 +++++---
 .../launcher/GenArbitraryLoadJobLauncher.java      |   5 +-
 .../workflow/AbstractNestingExecWorkflowImpl.java  |   2 +
 .../helloworld/HelloWorldJobLauncher.java          |   4 +-
 .../ddm/util/TemporalWorkFlowUtilsTest.java        | 111 +++++++++++++++++++++
 11 files changed, 230 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 6efb93d128..4c2e44c5bd 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -45,6 +45,7 @@ import 
org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -85,6 +86,7 @@ public class ExecuteGobblinJobLauncher extends 
GobblinTemporalJobLauncher {
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
           
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(finalProps)))
+          
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps))
           .build();
       ExecuteGobblinWorkflow workflow = 
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
index 68b7910bcd..ce0d8b732b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -41,6 +41,7 @@ import 
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 
 
 /**
@@ -72,6 +73,7 @@ public class GenerateWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
     try {
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
+          
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
           
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps)))
           .build();
       GenerateWorkUnitsWorkflow workflow = 
this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index fcd520714c..d0ce87c05d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -46,6 +46,8 @@ import 
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+
 
 import static 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
 
@@ -102,6 +104,7 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
 
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
+          
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
           
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
           .build();
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java
new file mode 100644
index 0000000000..ba6b3ed209
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+
+
+/**
+ * Utility class for handling Temporal workflow-related operations.
+ */
+@UtilityClass
+public class TemporalWorkFlowUtils {
+
+  public static final String DEFAULT_GAAS_FLOW_GROUP = 
"DEFAULT_GAAS_FLOW_GROUP";
+  public static final String DEFAULT_GAAS_FLOW_NAME = "DEFAULT_GAAS_FLOW_NAME";
+  public static final String DEFAULT_GAAS_USER_TO_PROXY = 
"DEFAULT_GAAS_USER_TO_PROXY";
+
+  /**
+   * Generates search attributes for a WorkFlow  based on the provided GAAS 
job properties.
+   *
+   * @param jobProps the properties of the job, must not be null.
+   * @return a map containing the generated search attributes.
+   */
+  public static Map<String, Object> generateGaasSearchAttributes(@NonNull 
Properties jobProps) {
+    Map<String, Object> attributes = new HashMap<>();
+    attributes.put(Help.GAAS_FLOW_ID_SEARCH_KEY,
+        String.format("%s.%s", 
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, DEFAULT_GAAS_FLOW_GROUP),
+            jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, 
DEFAULT_GAAS_FLOW_NAME)));
+    attributes.put(Help.USER_TO_PROXY_SEARCH_KEY, 
jobProps.getProperty(Help.USER_TO_PROXY_KEY, DEFAULT_GAAS_USER_TO_PROXY));
+    return attributes;
+  }
+
+  /**
+   * Converts search attribute values from a map of lists to a map of objects.
+   *
+   * @param searchAttributes a map where the keys are attribute names and the 
values are lists of attribute values.
+   *                         Can be null.
+   * @return a map where the keys are attribute names and the values are the 
corresponding attribute values.
+   *         If the input map is null, an empty map is returned.
+   */
+  public static Map<String, Object> 
convertSearchAttributesValuesFromListToObject(
+      Map<String, List<?>> searchAttributes) {
+    if (searchAttributes == null) {
+      return Collections.emptyMap();
+    }
+    return new HashMap<>(searchAttributes);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index e2011d7a9c..2c3b34fce0 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -61,6 +61,8 @@ public class Help {
   public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
   public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
   public static final String USER_TO_PROXY_KEY = "user.to.proxy";
+  public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
+  public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
 
   // treat `JobState` as immutable and cache, for reuse among activities 
executed by the same worker
   private static final transient Cache<Path, JobState> jobStateByPath = 
CacheBuilder.newBuilder().recordStats().build();
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 8a04e6e519..8eab3ef0bd 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -56,6 +56,7 @@ import 
org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
 
@@ -152,6 +153,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE, 
ConfigFactory.parseProperties(jobProps)))
+        
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(jobProps))
         .build();
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
   }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 975b9f6043..ccd2979491 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
+import java.util.Map;
 import java.util.Optional;
 
 import com.typesafe.config.ConfigFactory;
@@ -26,6 +27,7 @@ import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -40,6 +42,7 @@ import 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.runtime.JobState;
 
 
 @Slf4j
@@ -56,12 +59,24 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   }
 
   private CommitStats performWork(WUProcessingSpec workSpec) {
+
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
-    NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
-    int workunitsProcessed = 
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
-        workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
+    Map<String, Object> searchAttributes;
+    JobState jobState;
+    try {
+      jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec));
+    } catch (Exception e) {
+      log.error("Exception occured during loading jobState", e);
+      throw new RuntimeException("Exception occured during loading jobState", 
e);
+    }
+    searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
+
+    NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
+    int workunitsProcessed =
+        processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
workSpec.getTuning().getMaxBranchesPerTree(),
+            workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
     if (workunitsProcessed > 0) {
-      CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
+      CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
       CommitStats result = commitWorkflow.commit(workSpec);
       if (result.getNumCommittedWorkUnits() == 0) {
         log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
@@ -84,26 +99,28 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   }
 
   protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec 
workSpec) {
-    return new EagerFsDirBackedWorkUnitClaimCheckWorkload(
-        workSpec.getFileSystemUri(),
-        workSpec.getWorkUnitsDir(),
-        workSpec.getEventSubmitterContext()
-    );
+    return new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(), 
workSpec.getWorkUnitsDir(),
+        workSpec.getEventSubmitterContext());
   }
 
-  protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f) {
+  protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f,
+      Map<String, Object> searchAttributes) {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, 
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        .setSearchAttributes(searchAttributes)
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
+            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
         .build();
     // TODO: to incorporate multiple different concrete `NestingExecWorkflow` 
sub-workflows in the same super-workflow... shall we use queues?!?!?
     return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
   }
 
-  protected CommitStepWorkflow createCommitStepWorkflow() {
+  protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object> 
searchAttributes) {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
 WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        .setSearchAttributes(searchAttributes)
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
+            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
         .build();
 
     return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
index b1f103c7b0..86f1d4a092 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
@@ -42,6 +42,8 @@ import 
org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
 import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+
 
 import static 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
 
@@ -79,12 +81,11 @@ public class GenArbitraryLoadJobLauncher extends 
GobblinTemporalJobLauncher {
     int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE);
 
     Workload<IllustrationItem> workload = 
SimpleGeneratedWorkload.createAs(numActivities);
-    WorkflowOptions options = 
WorkflowOptions.newBuilder().setTaskQueue(this.queueName).build();
+    WorkflowOptions options = 
WorkflowOptions.newBuilder().setTaskQueue(this.queueName).setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps)).build();
 
     // WARNING: although type param must agree w/ that of `workload`, it's 
entirely unverified by type checker!
     // ...and more to the point, mismatch would occur at runtime 
(`performWorkload` on the workflow type given to the stub)!
     NestingExecWorkflow<IllustrationItem> workflow = 
this.client.newWorkflowStub(NestingExecWorkflow.class, options);
-
     workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
maxBranchesPerTree, maxSubTreesPerTree, Optional.empty());
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 6bef7a609c..92ef6e1af9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -34,6 +34,7 @@ import io.temporal.workflow.Promise;
 import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 
@@ -111,6 +112,7 @@ public abstract class 
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         .setWorkflowId(childWorkflowId)
+        
.setSearchAttributes(TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(Workflow.getSearchAttributes()))
         .build();
     return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
   }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 8ea47b67a1..db076d6391 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -36,6 +36,8 @@ import 
org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+
 
 
 /**
@@ -57,7 +59,7 @@ public class HelloWorldJobLauncher extends 
GobblinTemporalJobLauncher {
 
   @Override
   public void submitJob(List<WorkUnit> workunits) {
-    WorkflowOptions options = 
WorkflowOptions.newBuilder().setTaskQueue(queueName).build();
+    WorkflowOptions options = 
WorkflowOptions.newBuilder().setTaskQueue(queueName).setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps)).build();
     GreetingWorkflow greetingWorkflow = 
this.client.newWorkflowStub(GreetingWorkflow.class, options);
     EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext.Builder(eventSubmitter).build();
     String greeting = greetingWorkflow.getGreeting("Gobblin", 
eventSubmitterContext);
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtilsTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtilsTest.java
new file mode 100644
index 0000000000..07fa8291b3
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TemporalWorkFlowUtilsTest {
+
+  @Test
+  public void testGenerateGaasSearchAttributes() {
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "group");
+    jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "name");
+    jobProps.setProperty(Help.USER_TO_PROXY_KEY, "userProxy");
+
+    // Expected attributes
+    Map<String, Object> expectedAttributes = new HashMap<>();
+    expectedAttributes.put(Help.GAAS_FLOW_ID_SEARCH_KEY, "group.name");
+    expectedAttributes.put(Help.USER_TO_PROXY_SEARCH_KEY, "userProxy");
+
+    // Actual attributes
+    Map<String, Object> actualAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobProps);
+
+    // Assertions
+    Assert.assertEquals(expectedAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY), 
actualAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY));
+    Assert.assertEquals(expectedAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY),
+        actualAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY));
+  }
+
+  @Test
+  public void testGenerateGaasSearchAttributesWithMissingFlowGroup() {
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "name");
+    jobProps.setProperty(Help.USER_TO_PROXY_KEY, "userProxy");
+
+    // Expected attributes
+    Map<String, Object> expectedAttributes = new HashMap<>();
+    expectedAttributes.put(Help.GAAS_FLOW_ID_SEARCH_KEY, 
TemporalWorkFlowUtils.DEFAULT_GAAS_FLOW_GROUP + "." + "name");
+    expectedAttributes.put(Help.USER_TO_PROXY_SEARCH_KEY, "userProxy");
+
+    // Actual attributes
+    Map<String, Object> actualAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobProps);
+
+    // Assertions
+    Assert.assertEquals(expectedAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY), 
actualAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY));
+    Assert.assertEquals(expectedAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY),
+        actualAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY));
+  }
+
+  @Test
+  public void testGenerateGaasSearchAttributesWithNullProps() {
+    // Expecting a NullPointerException when jobProps is null
+    Assert.assertThrows(NullPointerException.class, () -> {
+      TemporalWorkFlowUtils.generateGaasSearchAttributes(null);
+    });
+  }
+
+  @Test
+  public void 
testConvertSearchAttributesValuesFromListToObject_withValidInput() {
+    Map<String, List<?>> searchAttributes = new HashMap<>();
+    searchAttributes.put("key1", Arrays.asList("value1", "value2"));
+    searchAttributes.put("key2", Arrays.asList(1, 2, 3));
+
+    Map<String, Object> result = 
TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(searchAttributes);
+
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertEquals(result.get("key1"), Arrays.asList("value1", "value2"));
+    Assert.assertEquals(result.get("key2"), Arrays.asList(1, 2, 3));
+  }
+
+  @Test
+  public void 
testConvertSearchAttributesValuesFromListToObject_withEmptyInput() {
+    Map<String, List<?>> searchAttributes = new HashMap<>();
+
+    Map<String, Object> result = 
TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(searchAttributes);
+
+    Assert.assertTrue(result.isEmpty());
+  }
+
+  @Test
+  public void 
testConvertSearchAttributesValuesFromListToObject_withNullInput() {
+    Map<String, List<?>> searchAttributes = null;
+
+    Map<String, Object> result = 
TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(searchAttributes);
+
+    Assert.assertTrue(result.isEmpty());
+  }
+}

Reply via email to