This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8a8d230d3f [GOBBLIN-2153] Added search attributes to Temporal
workflows (#4052)
8a8d230d3f is described below
commit 8a8d230d3f0a8d3e9a209330183d4189481f5e79
Author: pratapaditya04 <[email protected]>
AuthorDate: Wed Sep 18 00:44:04 2024 +0530
[GOBBLIN-2153] Added search attributes to Temporal workflows (#4052)
Added search attributes to Temporal workflows
---------
Co-authored-by: Aditya Pratap Singh <[email protected]>
---
.../ddm/launcher/ExecuteGobblinJobLauncher.java | 2 +
.../ddm/launcher/GenerateWorkUnitsJobLauncher.java | 2 +
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 3 +
.../temporal/ddm/util/TemporalWorkFlowUtils.java | 70 +++++++++++++
.../gobblin/temporal/ddm/work/assistance/Help.java | 2 +
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 2 +
.../impl/ProcessWorkUnitsWorkflowImpl.java | 43 +++++---
.../launcher/GenArbitraryLoadJobLauncher.java | 5 +-
.../workflow/AbstractNestingExecWorkflowImpl.java | 2 +
.../helloworld/HelloWorldJobLauncher.java | 4 +-
.../ddm/util/TemporalWorkFlowUtilsTest.java | 111 +++++++++++++++++++++
11 files changed, 230 insertions(+), 16 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 6efb93d128..4c2e44c5bd 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -45,6 +45,7 @@ import
org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -85,6 +86,7 @@ public class ExecuteGobblinJobLauncher extends
GobblinTemporalJobLauncher {
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(finalProps)))
+
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps))
.build();
ExecuteGobblinWorkflow workflow =
this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
index 68b7910bcd..ce0d8b732b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/GenerateWorkUnitsJobLauncher.java
@@ -41,6 +41,7 @@ import
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
/**
@@ -72,6 +73,7 @@ public class GenerateWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
try {
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
+
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps)))
.build();
GenerateWorkUnitsWorkflow workflow =
this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index fcd520714c..d0ce87c05d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -46,6 +46,8 @@ import
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+
import static
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
@@ -102,6 +104,7 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
+
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
.build();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java
new file mode 100644
index 0000000000..ba6b3ed209
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+
+
+/**
+ * Utility class for handling Temporal workflow-related operations.
+ */
+@UtilityClass
+public class TemporalWorkFlowUtils {
+
+ public static final String DEFAULT_GAAS_FLOW_GROUP =
"DEFAULT_GAAS_FLOW_GROUP";
+ public static final String DEFAULT_GAAS_FLOW_NAME = "DEFAULT_GAAS_FLOW_NAME";
+ public static final String DEFAULT_GAAS_USER_TO_PROXY =
"DEFAULT_GAAS_USER_TO_PROXY";
+
+ /**
+ * Generates search attributes for a WorkFlow based on the provided GAAS
job properties.
+ *
+ * @param jobProps the properties of the job, must not be null.
+ * @return a map containing the generated search attributes.
+ */
+ public static Map<String, Object> generateGaasSearchAttributes(@NonNull
Properties jobProps) {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Help.GAAS_FLOW_ID_SEARCH_KEY,
+ String.format("%s.%s",
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, DEFAULT_GAAS_FLOW_GROUP),
+ jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY,
DEFAULT_GAAS_FLOW_NAME)));
+ attributes.put(Help.USER_TO_PROXY_SEARCH_KEY,
jobProps.getProperty(Help.USER_TO_PROXY_KEY, DEFAULT_GAAS_USER_TO_PROXY));
+ return attributes;
+ }
+
+ /**
+ * Converts search attribute values from a map of lists to a map of objects.
+ *
+ * @param searchAttributes a map where the keys are attribute names and the
values are lists of attribute values.
+ * Can be null.
+ * @return a map where the keys are attribute names and the values are the
corresponding attribute values.
+ * If the input map is null, an empty map is returned.
+ */
+ public static Map<String, Object>
convertSearchAttributesValuesFromListToObject(
+ Map<String, List<?>> searchAttributes) {
+ if (searchAttributes == null) {
+ return Collections.emptyMap();
+ }
+ return new HashMap<>(searchAttributes);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index e2011d7a9c..2c3b34fce0 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -61,6 +61,8 @@ public class Help {
public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
public static final String USER_TO_PROXY_KEY = "user.to.proxy";
+ public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
+ public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
// treat `JobState` as immutable and cache, for reuse among activities
executed by the same worker
private static final transient Cache<Path, JobState> jobStateByPath =
CacheBuilder.newBuilder().recordStats().build();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 8a04e6e519..8eab3ef0bd 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -56,6 +56,7 @@ import
org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -152,6 +153,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(PROCESS_WORKFLOW_ID_BASE,
ConfigFactory.parseProperties(jobProps)))
+
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(jobProps))
.build();
return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class,
childOpts);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 975b9f6043..ccd2979491 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.temporal.ddm.workflow.impl;
+import java.util.Map;
import java.util.Optional;
import com.typesafe.config.ConfigFactory;
@@ -26,6 +27,7 @@ import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -40,6 +42,7 @@ import
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.runtime.JobState;
@Slf4j
@@ -56,12 +59,24 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
private CommitStats performWork(WUProcessingSpec workSpec) {
+
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
- NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
- int workunitsProcessed =
processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
- workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
+ Map<String, Object> searchAttributes;
+ JobState jobState;
+ try {
+ jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec));
+ } catch (Exception e) {
+ log.error("Exception occured during loading jobState", e);
+ throw new RuntimeException("Exception occured during loading jobState",
e);
+ }
+ searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
+
+ NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
+ int workunitsProcessed =
+ processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
+ workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
if (workunitsProcessed > 0) {
- CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
+ CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
CommitStats result = commitWorkflow.commit(workSpec);
if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have
been committed at the task level.");
@@ -84,26 +99,28 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
protected Workload<WorkUnitClaimCheck> createWorkload(WUProcessingSpec
workSpec) {
- return new EagerFsDirBackedWorkUnitClaimCheckWorkload(
- workSpec.getFileSystemUri(),
- workSpec.getWorkUnitsDir(),
- workSpec.getEventSubmitterContext()
- );
+ return new
EagerFsDirBackedWorkUnitClaimCheckWorkload(workSpec.getFileSystemUri(),
workSpec.getWorkUnitsDir(),
+ workSpec.getEventSubmitterContext());
}
- protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f) {
+ protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f,
+ Map<String, Object> searchAttributes) {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+ .setSearchAttributes(searchAttributes)
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
+ WorkerConfig.of(this).orElse(ConfigFactory.empty())))
.build();
// TODO: to incorporate multiple different concrete `NestingExecWorkflow`
sub-workflows in the same super-workflow... shall we use queues?!?!?
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}
- protected CommitStepWorkflow createCommitStepWorkflow() {
+ protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object>
searchAttributes) {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+ .setSearchAttributes(searchAttributes)
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
+ WorkerConfig.of(this).orElse(ConfigFactory.empty())))
.build();
return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
index b1f103c7b0..86f1d4a092 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
@@ -42,6 +42,8 @@ import
org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+
import static
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX;
@@ -79,12 +81,11 @@ public class GenArbitraryLoadJobLauncher extends
GobblinTemporalJobLauncher {
int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_MAX_SUB_TREES_PER_TREE);
Workload<IllustrationItem> workload =
SimpleGeneratedWorkload.createAs(numActivities);
- WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(this.queueName).build();
+ WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(this.queueName).setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps)).build();
// WARNING: although type param must agree w/ that of `workload`, it's
entirely unverified by type checker!
// ...and more to the point, mismatch would occur at runtime
(`performWorkload` on the workflow type given to the stub)!
NestingExecWorkflow<IllustrationItem> workflow =
this.client.newWorkflowStub(NestingExecWorkflow.class, options);
-
workflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
maxBranchesPerTree, maxSubTreesPerTree, Optional.empty());
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 6bef7a609c..92ef6e1af9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -34,6 +34,7 @@ import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
@@ -111,6 +112,7 @@ public abstract class
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
.setWorkflowId(childWorkflowId)
+
.setSearchAttributes(TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(Workflow.getSearchAttributes()))
.build();
return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 8ea47b67a1..db076d6391 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -36,6 +36,8 @@ import
org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+
/**
@@ -57,7 +59,7 @@ public class HelloWorldJobLauncher extends
GobblinTemporalJobLauncher {
@Override
public void submitJob(List<WorkUnit> workunits) {
- WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(queueName).build();
+ WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(queueName).setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps)).build();
GreetingWorkflow greetingWorkflow =
this.client.newWorkflowStub(GreetingWorkflow.class, options);
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder(eventSubmitter).build();
String greeting = greetingWorkflow.getGreeting("Gobblin",
eventSubmitterContext);
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtilsTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtilsTest.java
new file mode 100644
index 0000000000..07fa8291b3
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/util/TemporalWorkFlowUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TemporalWorkFlowUtilsTest {
+
+ @Test
+ public void testGenerateGaasSearchAttributes() {
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, "group");
+ jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "name");
+ jobProps.setProperty(Help.USER_TO_PROXY_KEY, "userProxy");
+
+ // Expected attributes
+ Map<String, Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put(Help.GAAS_FLOW_ID_SEARCH_KEY, "group.name");
+ expectedAttributes.put(Help.USER_TO_PROXY_SEARCH_KEY, "userProxy");
+
+ // Actual attributes
+ Map<String, Object> actualAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobProps);
+
+ // Assertions
+ Assert.assertEquals(expectedAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY),
actualAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY));
+ Assert.assertEquals(expectedAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY),
+ actualAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY));
+ }
+
+ @Test
+ public void testGenerateGaasSearchAttributesWithMissingFlowGroup() {
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "name");
+ jobProps.setProperty(Help.USER_TO_PROXY_KEY, "userProxy");
+
+ // Expected attributes
+ Map<String, Object> expectedAttributes = new HashMap<>();
+ expectedAttributes.put(Help.GAAS_FLOW_ID_SEARCH_KEY,
TemporalWorkFlowUtils.DEFAULT_GAAS_FLOW_GROUP + "." + "name");
+ expectedAttributes.put(Help.USER_TO_PROXY_SEARCH_KEY, "userProxy");
+
+ // Actual attributes
+ Map<String, Object> actualAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobProps);
+
+ // Assertions
+ Assert.assertEquals(expectedAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY),
actualAttributes.get(Help.GAAS_FLOW_ID_SEARCH_KEY));
+ Assert.assertEquals(expectedAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY),
+ actualAttributes.get(Help.USER_TO_PROXY_SEARCH_KEY));
+ }
+
+ @Test
+ public void testGenerateGaasSearchAttributesWithNullProps() {
+ // Expecting a NullPointerException when jobProps is null
+ Assert.assertThrows(NullPointerException.class, () -> {
+ TemporalWorkFlowUtils.generateGaasSearchAttributes(null);
+ });
+ }
+
+ @Test
+ public void
testConvertSearchAttributesValuesFromListToObject_withValidInput() {
+ Map<String, List<?>> searchAttributes = new HashMap<>();
+ searchAttributes.put("key1", Arrays.asList("value1", "value2"));
+ searchAttributes.put("key2", Arrays.asList(1, 2, 3));
+
+ Map<String, Object> result =
TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(searchAttributes);
+
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get("key1"), Arrays.asList("value1", "value2"));
+ Assert.assertEquals(result.get("key2"), Arrays.asList(1, 2, 3));
+ }
+
+ @Test
+ public void
testConvertSearchAttributesValuesFromListToObject_withEmptyInput() {
+ Map<String, List<?>> searchAttributes = new HashMap<>();
+
+ Map<String, Object> result =
TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(searchAttributes);
+
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void
testConvertSearchAttributesValuesFromListToObject_withNullInput() {
+ Map<String, List<?>> searchAttributes = null;
+
+ Map<String, Object> result =
TemporalWorkFlowUtils.convertSearchAttributesValuesFromListToObject(searchAttributes);
+
+ Assert.assertTrue(result.isEmpty());
+ }
+}