This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8e9146ee4e [GOBBLIN-2245] Independent Dynamic Scaling for different
Activities in Temporal WorkFlow (#4159)
8e9146ee4e is described below
commit 8e9146ee4e19daee303606d33007a81d4f961e73
Author: Agam Pal Singh <[email protected]>
AuthorDate: Thu Jan 15 11:41:40 2026 +0530
[GOBBLIN-2245] Independent Dynamic Scaling for different Activities in
Temporal WorkFlow (#4159)
* independent memory and container configuration for temporal stages
* route non execute activities to default queue
* code refactoring
* code cleanup
* fix: spin up 1 container initially by default
* proper queue routing for commit phase
* code cleanup
* - spin up initial container with execution worker
- unit tests
* - only forward NestingExecWorkflow to execution queue
* removed redundant unit tests
* removed redundant unit tests
* copilot review fixes
* PR review fixes
* undo whitespace change
* PR review comments addressing
* PR review comments addressing
* updated property name
* removed unused import
---------
Co-authored-by: Agam Pal Singh <[email protected]>
---
.../temporal/GobblinTemporalConfigurationKeys.java | 22 ++
.../temporal/cluster/AbstractTemporalWorker.java | 14 +-
.../cluster/GobblinTemporalTaskRunner.java | 37 ++-
.../AbstractRecommendScalingForWorkUnitsImpl.java | 32 ++-
.../temporal/ddm/worker/ExecutionWorker.java | 107 ++++++++
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 28 +-
.../impl/ProcessWorkUnitsWorkflowImpl.java | 28 +-
.../cluster/GobblinTemporalTaskRunnerTest.java | 147 +++++++++++
...stractRecommendScalingForWorkUnitsImplTest.java | 284 +++++++++++++++++++++
.../temporal/ddm/worker/ExecutionWorkerTest.java | 163 ++++++++++++
.../impl/ProcessWorkUnitsWorkflowImplTest.java | 255 ++++++++++++++++++
11 files changed, 1092 insertions(+), 25 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 0b95b78ef4..bc87fe1f55 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.temporal;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.temporal.ddm.worker.ExecutionWorker;
import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldJobLauncher;
import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker;
@@ -29,14 +30,20 @@ import
org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker;
public interface GobblinTemporalConfigurationKeys {
String PREFIX = "gobblin.temporal.";
+ String STAGE_SPECIFIC_PREFIX = PREFIX + "stage.";
String WORKER_CLASS = PREFIX + "worker.class";
String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName();
+ String EXECUTION_WORKER_CLASS = ExecutionWorker.class.getName();
String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name";
String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";
+
+ // Execution task queue for work execution specialization
+ String EXECUTION_TASK_QUEUE = PREFIX + "execution.task.queue.name";
+ String DEFAULT_EXECUTION_TASK_QUEUE = "GobblinTemporalExecutionQueue";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher.";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS =
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class";
String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS =
HelloWorldJobLauncher.class.getName();
@@ -71,6 +78,17 @@ public interface GobblinTemporalConfigurationKeys {
int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
String TEMPORAL_NUM_THREADS_PER_WORKER = PREFIX + "num.threads.per.worker";
int DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER = 15;
+ String TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER = PREFIX +
"num.threads.per.execution.worker";
+
+ // Concurrency configs for WorkFulfillmentWorker
+ String TEMPORAL_MAX_CONCURRENT_ACTIVITY_SIZE = PREFIX +
"max.concurrent.activity.size";
+ String TEMPORAL_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE = PREFIX +
"max.concurrent.local.activity.size";
+ String TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASK_SIZE = PREFIX +
"max.concurrent.workflow.task.size";
+
+ // Concurrency configs for ExecutionWorker
+ String TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE = PREFIX +
"execution.max.concurrent.activity.size";
+ String TEMPORAL_EXECUTION_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE = PREFIX +
"execution.max.concurrent.local.activity.size";
+ String TEMPORAL_EXECUTION_MAX_CONCURRENT_WORKFLOW_TASK_SIZE = PREFIX +
"execution.max.concurrent.workflow.task.size";
// Configuration key for setting the amortized throughput per worker thread
per minute
String TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE = PREFIX +
"worker.thread.amortized.throughput.per.minute";
@@ -136,4 +154,8 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;
+ /**
+ * Memory allocation for execution worker containers.
+ */
+ String WORK_EXECUTION_MEMORY_MB = STAGE_SPECIFIC_PREFIX +
"workExecution.memory.mb";
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
index 8ab428c418..b569417f0b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
@@ -33,16 +33,12 @@ import org.apache.gobblin.util.ConfigUtils;
/** Basic boilerplate for a {@link TemporalWorker} to register its activity
and workflow capabilities and listen on a particular queue */
public abstract class AbstractTemporalWorker implements TemporalWorker {
private final WorkflowClient workflowClient;
- private final String queueName;
private final WorkerFactory workerFactory;
- private final Config config;
+ protected final Config config;
public AbstractTemporalWorker(Config cfg, WorkflowClient client) {
config = cfg;
workflowClient = client;
- queueName = ConfigUtils.getString(cfg,
- GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
-
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
// Create a Worker factory that can be used to create Workers that
poll specific Task Queues.
workerFactory = WorkerFactory.newInstance(workflowClient);
@@ -52,7 +48,7 @@ public abstract class AbstractTemporalWorker implements
TemporalWorker {
@Override
public void start() {
- Worker worker = workerFactory.newWorker(queueName,
createWorkerOptions());
+ Worker worker = workerFactory.newWorker(getTaskQueue(),
createWorkerOptions());
// This Worker hosts both Workflow and Activity implementations.
// Workflows are stateful, so you need to supply a type to create
instances.
worker.registerWorkflowImplementationTypes(getWorkflowImplClasses());
@@ -77,6 +73,12 @@ public abstract class AbstractTemporalWorker implements
TemporalWorker {
/** @return activity instances; NOTE: activities must be stateless and
thread-safe, so a shared instance is used. */
protected abstract Object[] getActivityImplInstances();
+ protected String getTaskQueue() {
+ return ConfigUtils.getString(config,
+ GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
+
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
+ }
+
private final void stashWorkerConfig(Config cfg) {
// stash to associate with...
WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index e7cd4315f0..b7a609c51f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -238,6 +238,7 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
for (int i = 0; i < this.numTemporalWorkers; i++) {
workers.add(initiateWorker());
}
+ initializeExecutionWorkers();
}catch (Exception e) {
logger.info(e + " for initiate workers");
throw new RuntimeException(e);
@@ -252,8 +253,8 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
- String workerClassName = ConfigUtils.getString(clusterConfig,
- GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+ String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS,
+ GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
logger.info("Creating worker - class: '{}'", workerClassName);
Config workerConfig = clusterConfig;
TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
@@ -263,6 +264,38 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
return worker;
}
+ private void initializeExecutionWorkers() throws Exception {
+ boolean dynamicScalingEnabled = ConfigUtils.getBoolean(clusterConfig,
+ GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false);
+
+ if (!dynamicScalingEnabled) {
+ return;
+ }
+
+ String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS,
+ GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+ boolean isExecutionWorkerContainer =
GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS.equals(workerClassName);
+
+ // only the initial container (WorkFulfillment worker) should start an
additional ExecutionWorker worker
+ if (isExecutionWorkerContainer) {
+ return;
+ }
+
+ logger.info("Starting additional ExecutionWorker in initial container");
+
+ String namespace = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
+ GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
+ WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
+ managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
+
+ TemporalWorker executionWorker =
GobblinConstructorUtils.invokeLongestConstructor(
+
(Class<TemporalWorker>)Class.forName(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS),
+ clusterConfig, client);
+ executionWorker.start();
+ workers.add(executionWorker);
+ logger.info("Worker started for class: {}",
GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS);
+ }
+
private void initMetricReporter() {
if (this.containerMetrics.isPresent()) {
try {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
index a0d3fd11e5..9390e66f48 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.ddm.activity.impl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -25,6 +26,7 @@ import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
@@ -32,6 +34,7 @@ import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
/**
@@ -62,8 +65,9 @@ public abstract class
AbstractRecommendScalingForWorkUnitsImpl implements Recomm
protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary
remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);
protected ProfileDerivation calcProfileDerivation(String basisProfileName,
WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
- // TODO: implement right-sizing!!! (for now just return unchanged)
- return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
+ // Create overlay with execution-specific memory and worker class
+ ProfileOverlay overlay = createExecutionWorkerOverlay(jobState);
+ return new ProfileDerivation(basisProfileName, overlay);
}
protected String calcProfileDerivationName(JobState jobState) {
@@ -72,6 +76,28 @@ public abstract class
AbstractRecommendScalingForWorkUnitsImpl implements Recomm
}
protected String calcBasisProfileName(JobState jobState) {
- return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
+ // Always derive from the global baseline
+ return WorkforceProfiles.BASELINE_NAME;
}
+
+ private ProfileOverlay createExecutionWorkerOverlay(JobState jobState) {
+ List<ProfileOverlay.KVPair> overlayPairs = new ArrayList<>();
+
+ // Add execution-specific memory if configured (overrides baseline memory)
+ if
(jobState.contains(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)) {
+ overlayPairs.add(new ProfileOverlay.KVPair(
+ GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
+
jobState.getProp(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)
+ ));
+ }
+
+ // Add ExecutionWorker class to ensure correct task queue routing
+ overlayPairs.add(new ProfileOverlay.KVPair(
+ GobblinTemporalConfigurationKeys.WORKER_CLASS,
+ GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS
+ ));
+
+ return new ProfileOverlay.Adding(overlayPairs);
+ }
+
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java
new file mode 100644
index 0000000000..452d23b1ad
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.worker;
+
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.WorkerOptions;
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
+import
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Specialized worker for Work Execution stage.
+ * This worker only registers activities for:
+ * - ProcessWorkUnit (Work Execution)
+ *
+ * Runs on containers with stage-specific memory for work execution operations.
+ * Polls the execution task queue to ensure activities run on
appropriately-sized containers.
+ */
+public class ExecutionWorker extends AbstractTemporalWorker {
+ public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120;
+ @Getter(AccessLevel.PACKAGE)
+ private final int maxConcurrentActivityExecutionSize;
+ @Getter(AccessLevel.PACKAGE)
+ private final int maxConcurrentLocalActivityExecutionSize;
+ @Getter(AccessLevel.PACKAGE)
+ private final int maxConcurrentWorkflowTaskExecutionSize;
+
+ public ExecutionWorker(Config config, WorkflowClient workflowClient) {
+ super(config, workflowClient);
+ int defaultThreadsPerWorker =
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER;
+
+ // Fallback chain: TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER ->
TEMPORAL_NUM_THREADS_PER_WORKER -> DEFAULT
+ int executionWorkerThreads = ConfigUtils.getInt(config,
+
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER,
+ ConfigUtils.getInt(config,
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
defaultThreadsPerWorker));
+
+ this.maxConcurrentActivityExecutionSize = ConfigUtils.getInt(config,
+
GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE,
+ executionWorkerThreads);
+ this.maxConcurrentLocalActivityExecutionSize =
ConfigUtils.getInt(config,
+
GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE,
+ executionWorkerThreads);
+ this.maxConcurrentWorkflowTaskExecutionSize =
ConfigUtils.getInt(config,
+
GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_WORKFLOW_TASK_SIZE,
+ executionWorkerThreads);
+ }
+
+ @Override
+ protected Class<?>[] getWorkflowImplClasses() {
+ return new Class[] {
+ ProcessWorkUnitsWorkflowImpl.class,
+ NestingExecOfProcessWorkUnitWorkflowImpl.class
+ };
+ }
+
+ @Override
+ protected Object[] getActivityImplInstances() {
+ return new Object[] {
+ new ProcessWorkUnitImpl()
+ };
+ }
+
+ @Override
+ protected WorkerOptions createWorkerOptions() {
+ return WorkerOptions.newBuilder()
+
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
+
.setMaxConcurrentActivityExecutionSize(this.maxConcurrentActivityExecutionSize)
+
.setMaxConcurrentLocalActivityExecutionSize(this.maxConcurrentLocalActivityExecutionSize)
+
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxConcurrentWorkflowTaskExecutionSize)
+ .build();
+ }
+
+ @Override
+ protected String getTaskQueue() {
+ return ConfigUtils.getString(
+ config,
+ GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE,
+ GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE
+ );
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 3ee45d20eb..969aab7490 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -44,12 +44,28 @@ import org.apache.gobblin.util.ConfigUtils;
/** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
public class WorkFulfillmentWorker extends AbstractTemporalWorker {
public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; //
TODO: make configurable!
- public int maxExecutionConcurrency;
+ private final int maxConcurrentActivityExecutionSize;
+ private final int maxConcurrentLocalActivityExecutionSize;
+ private final int maxConcurrentWorkflowTaskExecutionSize;
public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient)
{
super(config, workflowClient);
- this.maxExecutionConcurrency = ConfigUtils.getInt(config,
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
-
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER);
+ int defaultThreadsPerWorker =
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER;
+
+ // Fallback chain: TEMPORAL_NUM_THREADS_PER_WORKER -> DEFAULT
+ int workerThreads = ConfigUtils.getInt(config,
+ GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+ defaultThreadsPerWorker);
+
+ this.maxConcurrentActivityExecutionSize = ConfigUtils.getInt(config,
+
GobblinTemporalConfigurationKeys.TEMPORAL_MAX_CONCURRENT_ACTIVITY_SIZE,
+ workerThreads);
+ this.maxConcurrentLocalActivityExecutionSize =
ConfigUtils.getInt(config,
+
GobblinTemporalConfigurationKeys.TEMPORAL_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE,
+ workerThreads);
+ this.maxConcurrentWorkflowTaskExecutionSize =
ConfigUtils.getInt(config,
+
GobblinTemporalConfigurationKeys.TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASK_SIZE,
+ workerThreads);
}
@Override
@@ -69,9 +85,9 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
return WorkerOptions.newBuilder()
// default is only 1s - WAY TOO SHORT for
`o.a.hadoop.fs.FileSystem#listStatus`!
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
-
.setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency)
-
.setMaxConcurrentLocalActivityExecutionSize(this.maxExecutionConcurrency)
-
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxExecutionConcurrency)
+
.setMaxConcurrentActivityExecutionSize(this.maxConcurrentActivityExecutionSize)
+
.setMaxConcurrentLocalActivityExecutionSize(this.maxConcurrentLocalActivityExecutionSize)
+
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxConcurrentWorkflowTaskExecutionSize)
.build();
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 94e870c283..3fb309a927 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -32,6 +33,7 @@ import io.temporal.workflow.Workflow;
import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.activity.ActivityType;
@@ -174,25 +176,35 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
protected NestingExecWorkflow<WorkUnitClaimCheck>
createProcessingWorkflow(FileSystemJobStateful f,
Map<String, Object> searchAttributes) {
- ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
+ boolean dynamicScalingEnabled =
config.hasPath(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED)
+ &&
config.getBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED);
+
+ ChildWorkflowOptions.Builder childOpts = ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
.setSearchAttributes(searchAttributes)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
- WorkerConfig.of(this).orElse(ConfigFactory.empty())))
- .build();
- // TODO: to incorporate multiple different concrete `NestingExecWorkflow`
sub-workflows in the same super-workflow... shall we use queues?!?!?
- return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
config));
+
+ // Route NestingExecWorkflow (work execution) to execution
+ if (dynamicScalingEnabled) {
+
childOpts.setTaskQueue(config.hasPath(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+ ?
config.getString(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+ : GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE);
+ }
+
+ return Workflow.newChildWorkflowStub(NestingExecWorkflow.class,
childOpts.build());
}
protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object>
searchAttributes) {
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
// TODO: verify to instead use: Policy.PARENT_CLOSE_POLICY_TERMINATE)
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.setSearchAttributes(searchAttributes)
-
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
- WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
config))
.build();
+ // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow
parent
return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
}
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java
new file mode 100644
index 0000000000..69772116d2
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
+
+
+/**
+ * Tests for {@link GobblinTemporalTaskRunner} worker initialization logic.
+ */
+public class GobblinTemporalTaskRunnerTest {
+
+ /**
+ * Tests that initializeExecutionWorkers does nothing when dynamic scaling
is disabled.
+ */
+ @Test
+ public void testInitializeExecutionWorkersWhenDynamicScalingDisabled()
throws Exception {
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
ConfigValueFactory.fromAnyRef(false))
+ .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS,
+
ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS));
+
+ GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config);
+ List<TemporalWorker> workers = getWorkersField(taskRunner);
+ int initialWorkerCount = workers.size();
+
+ invokeInitializeExecutionWorkers(taskRunner);
+
+ Assert.assertEquals(workers.size(), initialWorkerCount,
+ "No workers should be added when dynamic scaling is disabled");
+ }
+
+ /**
+ * Tests that initializeExecutionWorkers does nothing when container is
already ExecutionWorker.
+ */
+ @Test
+ public void testInitializeExecutionWorkersWhenAlreadyExecutionWorker()
throws Exception {
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
ConfigValueFactory.fromAnyRef(true))
+ .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS,
+
ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS));
+
+ GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config);
+ List<TemporalWorker> workers = getWorkersField(taskRunner);
+ int initialWorkerCount = workers.size();
+
+ invokeInitializeExecutionWorkers(taskRunner);
+
+ Assert.assertEquals(workers.size(), initialWorkerCount,
+ "No workers should be added when container is already
ExecutionWorker");
+ }
+
+ /**
+ * Tests that initializeExecutionWorkers does nothing when dynamic scaling
config is missing.
+ */
+ @Test
+ public void testInitializeExecutionWorkersWhenConfigMissing() throws
Exception {
+ Config config = ConfigFactory.empty()
+ .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS,
+
ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS));
+
+ GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config);
+ List<TemporalWorker> workers = getWorkersField(taskRunner);
+ int initialWorkerCount = workers.size();
+
+ invokeInitializeExecutionWorkers(taskRunner);
+
+ Assert.assertEquals(workers.size(), initialWorkerCount,
+ "No workers should be added when dynamic scaling config is missing");
+ }
+
+ /**
+ * Helper to create a mock GobblinTemporalTaskRunner with necessary fields
set.
+ */
+ private GobblinTemporalTaskRunner createMockTaskRunner(Config config) throws
Exception {
+ GobblinTemporalTaskRunner taskRunner =
Mockito.mock(GobblinTemporalTaskRunner.class, Mockito.CALLS_REAL_METHODS);
+
+ // Set clusterConfig field
+ Field clusterConfigField =
GobblinTemporalTaskRunner.class.getDeclaredField("clusterConfig");
+ clusterConfigField.setAccessible(true);
+ clusterConfigField.set(taskRunner, config);
+
+ // Set workers list
+ Field workersField =
GobblinTemporalTaskRunner.class.getDeclaredField("workers");
+ workersField.setAccessible(true);
+ workersField.set(taskRunner, new ArrayList<TemporalWorker>());
+
+ // Mock managedWorkflowServiceStubs
+ ManagedWorkflowServiceStubs mockStubs =
Mockito.mock(ManagedWorkflowServiceStubs.class);
+ Field stubsField =
GobblinTemporalTaskRunner.class.getDeclaredField("managedWorkflowServiceStubs");
+ stubsField.setAccessible(true);
+ stubsField.set(taskRunner, mockStubs);
+
+ // Mock WorkflowClient
+ Mockito.when(mockStubs.getWorkflowServiceStubs()).thenReturn(null);
+
+ return taskRunner;
+ }
+
+ /**
+ * Helper to invoke the private initializeExecutionWorkers method using
reflection.
+ */
+ private void invokeInitializeExecutionWorkers(GobblinTemporalTaskRunner
taskRunner) throws Exception {
+ Method method =
GobblinTemporalTaskRunner.class.getDeclaredMethod("initializeExecutionWorkers");
+ method.setAccessible(true);
+ method.invoke(taskRunner);
+ }
+
+ /**
+ * Helper to get the workers list field using reflection.
+ */
+ @SuppressWarnings("unchecked")
+ private List<TemporalWorker> getWorkersField(GobblinTemporalTaskRunner
taskRunner) throws Exception {
+ Field workersField =
GobblinTemporalTaskRunner.class.getDeclaredField("workers");
+ workersField.setAccessible(true);
+ return (List<TemporalWorker>) workersField.get(taskRunner);
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java
new file mode 100644
index 0000000000..b8cccddb70
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+
+/**
+ * Tests for {@link AbstractRecommendScalingForWorkUnitsImpl} focusing on
profile overlay creation
+ * and ExecutionWorker configuration for dynamic scaling.
+ */
+public class AbstractRecommendScalingForWorkUnitsImplTest {
+
+ private TestableAbstractRecommendScalingForWorkUnitsImpl scalingImpl;
+ private Properties jobProps;
+
+ @Mock
+ private WorkUnitsSizeSummary mockWorkSummary;
+
+ @Mock
+ private TimeBudget mockTimeBudget;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ scalingImpl = new TestableAbstractRecommendScalingForWorkUnitsImpl();
+ jobProps = new Properties();
+ // JobState requires job.name and job.id
+ jobProps.setProperty("job.name", "TestJob");
+ jobProps.setProperty("job.id", "TestJob_123");
+ }
+
+ /**
+ * Tests that ExecutionWorker class is always added to profile overlay.
+ */
+ @Test
+ public void testProfileOverlayAlwaysIncludesExecutionWorkerClass() {
+ JobState jobState = new JobState(jobProps);
+
+ ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+ WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+ Assert.assertNotNull(derivation);
+ Assert.assertEquals(derivation.getBasisProfileName(),
WorkforceProfiles.BASELINE_NAME);
+
+ ProfileOverlay overlay = derivation.getOverlay();
+ Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+
+ // Verify ExecutionWorker class is in overlay
+ ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+ boolean hasExecutionWorkerClass = addingOverlay.getAdditionPairs().stream()
+ .anyMatch(kv ->
kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS)
+ &&
kv.getValue().equals(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS));
+
+ Assert.assertTrue(hasExecutionWorkerClass,
+ "Profile overlay should always include ExecutionWorker class");
+ }
+
+ /**
+ * Tests that execution-specific memory is added to overlay when configured.
+ */
+ @Test
+ public void testProfileOverlayIncludesExecutionMemoryWhenConfigured() {
+ String executionMemory = "32768";
+
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB,
executionMemory);
+ JobState jobState = new JobState(jobProps);
+
+ ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+ WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+ ProfileOverlay overlay = derivation.getOverlay();
+ Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+
+ ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+
+ // Verify memory is in overlay
+ boolean hasMemoryConfig = addingOverlay.getAdditionPairs().stream()
+ .anyMatch(kv ->
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY)
+ && kv.getValue().equals(executionMemory));
+
+ Assert.assertTrue(hasMemoryConfig,
+ "Profile overlay should include execution memory when configured");
+ }
+
+ /**
+ * Tests that memory is not added to overlay when not configured (falls back
to baseline).
+ */
+ @Test
+ public void testProfileOverlayOmitsMemoryWhenNotConfigured() {
+ JobState jobState = new JobState(jobProps);
+
+ ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+ WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+ ProfileOverlay overlay = derivation.getOverlay();
+ Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+
+ ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+
+ // Verify memory is NOT in overlay (will use baseline memory)
+ boolean hasMemoryConfig = addingOverlay.getAdditionPairs().stream()
+ .anyMatch(kv ->
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY));
+
+ Assert.assertFalse(hasMemoryConfig,
+ "Profile overlay should not include memory config when not set (uses
baseline)");
+ }
+
+ /**
+ * Tests that overlay contains both ExecutionWorker class and memory when
both configured.
+ */
+ @Test
+ public void testProfileOverlayIncludesBothWorkerClassAndMemory() {
+ String executionMemory = "65536";
+
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB,
executionMemory);
+ JobState jobState = new JobState(jobProps);
+
+ ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+ WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+ ProfileOverlay overlay = derivation.getOverlay();
+ Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+
+ ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+ List<ProfileOverlay.KVPair> kvPairs = addingOverlay.getAdditionPairs();
+
+ // Should have exactly 2 entries: worker class + memory
+ Assert.assertEquals(kvPairs.size(), 2,
+ "Overlay should have 2 entries when memory is configured");
+
+ boolean hasWorkerClass = kvPairs.stream()
+ .anyMatch(kv ->
kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS));
+ boolean hasMemory = kvPairs.stream()
+ .anyMatch(kv ->
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY));
+
+ Assert.assertTrue(hasWorkerClass && hasMemory,
+ "Overlay should contain both worker class and memory");
+ }
+
+ /**
+ * Tests that profile derivation name uses default.
+ */
+ @Test
+ public void testProfileDerivationNameUsesDefault() {
+ JobState jobState = new JobState(jobProps);
+
+ String derivationName = scalingImpl.calcProfileDerivationName(jobState);
+
+ Assert.assertEquals(derivationName,
AbstractRecommendScalingForWorkUnitsImpl.DEFAULT_PROFILE_DERIVATION_NAME);
+ }
+
+ /**
+ * Tests that basis profile name is always baseline.
+ */
+ @Test
+ public void testBasisProfileNameIsAlwaysBaseline() {
+ JobState jobState = new JobState(jobProps);
+
+ String basisProfileName = scalingImpl.calcBasisProfileName(jobState);
+
+ Assert.assertEquals(basisProfileName, WorkforceProfiles.BASELINE_NAME);
+ }
+
+ /**
+ * Tests that recommendScaling returns a ScalingDirective with correct
profile derivation.
+ */
+ @Test
+ public void testRecommendScalingReturnsDirectiveWithProfileDerivation() {
+ String executionMemory = "16384";
+
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB,
executionMemory);
+
+ List<ScalingDirective> directives = scalingImpl.recommendScaling(
+ mockWorkSummary, "TestSource", mockTimeBudget, jobProps);
+
+ Assert.assertNotNull(directives);
+ Assert.assertEquals(directives.size(), 1);
+
+ ScalingDirective directive = directives.get(0);
+ Assert.assertTrue(directive.getOptDerivedFrom().isPresent(),
+ "Directive should have profile derivation");
+
+ ProfileDerivation derivation = directive.getOptDerivedFrom().get();
+ Assert.assertEquals(derivation.getBasisProfileName(),
WorkforceProfiles.BASELINE_NAME);
+
+ // Verify overlay has ExecutionWorker class
+ ProfileOverlay overlay = derivation.getOverlay();
+ Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+
+ ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+ boolean hasExecutionWorker = addingOverlay.getAdditionPairs().stream()
+ .anyMatch(kv ->
kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS)
+ &&
kv.getValue().equals(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS));
+
+ Assert.assertTrue(hasExecutionWorker,
+ "Scaling directive should include ExecutionWorker in profile
derivation");
+ }
+
+ /**
+ * Tests that different memory values create different overlays.
+ */
+ @Test
+ public void testDifferentMemoryValuesCreateDifferentOverlays() {
+ // First config with 16GB
+
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB,
"16384");
+ JobState jobState1 = new JobState(jobProps);
+ ProfileDerivation derivation1 = scalingImpl.calcProfileDerivation(
+ WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState1);
+
+ // Second config with 32GB
+ Properties jobProps2 = new Properties();
+ jobProps2.setProperty("job.name", "TestJob");
+ jobProps2.setProperty("job.id", "TestJob_456");
+
jobProps2.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB,
"32768");
+ JobState jobState2 = new JobState(jobProps2);
+ ProfileDerivation derivation2 = scalingImpl.calcProfileDerivation(
+ WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState2);
+
+ // Extract memory values from overlays
+ ProfileOverlay.Adding overlay1 = (ProfileOverlay.Adding)
derivation1.getOverlay();
+ ProfileOverlay.Adding overlay2 = (ProfileOverlay.Adding)
derivation2.getOverlay();
+
+ String memory1 = overlay1.getAdditionPairs().stream()
+ .filter(kv ->
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY))
+ .map(kv -> kv.getValue())
+ .findFirst()
+ .orElse(null);
+
+ String memory2 = overlay2.getAdditionPairs().stream()
+ .filter(kv ->
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY))
+ .map(kv -> kv.getValue())
+ .findFirst()
+ .orElse(null);
+
+ Assert.assertEquals(memory1, "16384");
+ Assert.assertEquals(memory2, "32768");
+ Assert.assertNotEquals(memory1, memory2,
+ "Different memory configs should produce different overlay values");
+ }
+
+ /**
+ * Testable concrete implementation of
AbstractRecommendScalingForWorkUnitsImpl.
+ */
+ private static class TestableAbstractRecommendScalingForWorkUnitsImpl
+ extends AbstractRecommendScalingForWorkUnitsImpl {
+
+ @Override
+ protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork,
String sourceClass,
+ TimeBudget timeBudget, JobState jobState) {
+ // Simple test implementation: return 5 containers
+ return 5;
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java
new file mode 100644
index 0000000000..45c114bbcd
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.worker;
+
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.WorkerFactory;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/**
+ * Tests for {@link ExecutionWorker} configuration logic.
+ */
+public class ExecutionWorkerTest {
+
+ private MockedStatic<WorkerFactory> mockedWorkerFactory;
+ private WorkerFactory mockFactory;
+
+ @BeforeMethod
+ public void setUp() {
+ // Mock the static WorkerFactory.newInstance() method
+ mockFactory = Mockito.mock(WorkerFactory.class);
+ mockedWorkerFactory = Mockito.mockStatic(WorkerFactory.class);
+ mockedWorkerFactory.when(() ->
WorkerFactory.newInstance(Mockito.any(WorkflowClient.class)))
+ .thenReturn(mockFactory);
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (mockedWorkerFactory != null) {
+ mockedWorkerFactory.close();
+ }
+ }
+
+ /**
+ * Tests that concurrency fields are initialized with custom values from
config.
+ */
+ @Test
+ public void testConcurrencyFieldsWithCustomConfig() {
+ int customActivityConcurrency = 10;
+ int customLocalActivityConcurrency = 8;
+ int customWorkflowTaskConcurrency = 12;
+
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE,
+ ConfigValueFactory.fromAnyRef(customActivityConcurrency))
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE,
+ ConfigValueFactory.fromAnyRef(customLocalActivityConcurrency))
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_WORKFLOW_TASK_SIZE,
+ ConfigValueFactory.fromAnyRef(customWorkflowTaskConcurrency));
+
+ WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+ ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+
+ Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(),
customActivityConcurrency);
+ Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(),
customLocalActivityConcurrency);
+ Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(),
customWorkflowTaskConcurrency);
+ }
+
+ /**
+ * Tests that concurrency fields fall back to
TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER.
+ */
+ @Test
+ public void testConcurrencyFieldsFallbackToExecutionWorkerThreads() {
+ int executionWorkerThreads = 20;
+
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER,
+ ConfigValueFactory.fromAnyRef(executionWorkerThreads));
+
+ WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+ ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+
+ Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(),
executionWorkerThreads);
+ Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(),
executionWorkerThreads);
+ Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(),
executionWorkerThreads);
+ }
+
+ /**
+ * Tests that concurrency fields fall back to
TEMPORAL_NUM_THREADS_PER_WORKER.
+ */
+ @Test
+ public void testConcurrencyFieldsFallbackToWorkerThreads() {
+ int workerThreads = 25;
+
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+ ConfigValueFactory.fromAnyRef(workerThreads));
+
+ WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+ ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+
+ Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(),
workerThreads);
+ Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(),
workerThreads);
+ Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(),
workerThreads);
+ }
+
+ /**
+ * Tests that concurrency fields use default value when no config is set.
+ */
+ @Test
+ public void testConcurrencyFieldsWithDefaultValue() {
+ Config config = ConfigFactory.empty();
+
+ WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+ ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+
+ int defaultValue =
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER;
+ Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(),
defaultValue);
+ Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(),
defaultValue);
+ Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(),
defaultValue);
+ }
+
+ /**
+ * Tests that specific configs take precedence over base execution worker
config.
+ */
+ @Test
+ public void testSpecificConfigTakesPrecedence() {
+ int executionWorkerThreads = 20;
+ int specificActivityConcurrency = 10;
+
+ Config config = ConfigFactory.empty()
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER,
+ ConfigValueFactory.fromAnyRef(executionWorkerThreads))
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE,
+ ConfigValueFactory.fromAnyRef(specificActivityConcurrency));
+
+ WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+ ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+
+ // Activity should use specific config
+ Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(),
specificActivityConcurrency);
+ // Others should fall back to execution worker threads
+ Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(),
executionWorkerThreads);
+ Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(),
executionWorkerThreads);
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java
new file mode 100644
index 0000000000..272aca43fd
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+
+
+/**
+ * Tests for {@link ProcessWorkUnitsWorkflowImpl} focusing on task queue
routing
+ * and child workflow creation.
+ */
+public class ProcessWorkUnitsWorkflowImplTest {
+
+ private MockedStatic<Workflow> workflowMockedStatic;
+ private MockedStatic<WorkerConfig> workerConfigMockedStatic;
+ private MockedStatic<Help> helpMockedStatic;
+ private ProcessWorkUnitsWorkflowImpl workflow;
+
+ @BeforeMethod
+ public void setup() {
+ workflowMockedStatic = Mockito.mockStatic(Workflow.class);
+ workerConfigMockedStatic = Mockito.mockStatic(WorkerConfig.class);
+ helpMockedStatic = Mockito.mockStatic(Help.class);
+
+ // Mock Help.qualifyNamePerExecWithFlowExecId to return a simple workflow
ID
+ helpMockedStatic.when(() ->
Help.qualifyNamePerExecWithFlowExecId(Mockito.anyString(), Mockito.any(),
Mockito.any()))
+ .thenReturn("test-workflow-id");
+ helpMockedStatic.when(() ->
Help.qualifyNamePerExecWithFlowExecId(Mockito.anyString(), Mockito.any()))
+ .thenReturn("test-workflow-id");
+
+ workflow = new ProcessWorkUnitsWorkflowImpl();
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (workflowMockedStatic != null) {
+ workflowMockedStatic.close();
+ }
+ if (workerConfigMockedStatic != null) {
+ workerConfigMockedStatic.close();
+ }
+ if (helpMockedStatic != null) {
+ helpMockedStatic.close();
+ }
+ }
+
+ /**
+ * Tests that CommitStepWorkflow child workflow is created without explicit
task queue,
+ * allowing it to inherit the parent workflow's task queue (default queue).
+ * This ensures CommitStepWorkflow runs on WorkFulfillmentWorker, not
ExecutionWorker.
+ */
+ @Test
+ public void testCreateCommitStepWorkflowUsesDefaultQueue() {
+ // Setup
+ Map<String, Object> searchAttributes = new HashMap<>();
+ searchAttributes.put("test", "value");
+
+ Config mockConfig = ConfigFactory.empty();
+ workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+ .thenReturn(java.util.Optional.of(mockConfig));
+
+ CommitStepWorkflow mockCommitWorkflow =
Mockito.mock(CommitStepWorkflow.class);
+
+ // Capture the ChildWorkflowOptions passed to newChildWorkflowStub
+ workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+ Mockito.eq(CommitStepWorkflow.class),
+ Mockito.any(ChildWorkflowOptions.class)))
+ .thenAnswer(invocation -> {
+ ChildWorkflowOptions options = invocation.getArgument(1);
+ // Verify task queue is NOT set (should be null to inherit from
parent)
+ Assert.assertNull(options.getTaskQueue(),
+ "CommitStepWorkflow should not have explicit task queue set");
+ Assert.assertEquals(options.getSearchAttributes(), searchAttributes);
+ return mockCommitWorkflow;
+ });
+
+ // Execute
+ CommitStepWorkflow result =
workflow.createCommitStepWorkflow(searchAttributes);
+
+ // Verify
+ Assert.assertNotNull(result);
+ workflowMockedStatic.verify(() -> Workflow.newChildWorkflowStub(
+ Mockito.eq(CommitStepWorkflow.class),
+ Mockito.any(ChildWorkflowOptions.class)), Mockito.times(1));
+ }
+
+ /**
+ * Tests that CommitStepWorkflow creation works when WorkerConfig is absent.
+ */
+ @Test
+ public void testCreateCommitStepWorkflowWithoutWorkerConfig() {
+ // Setup
+ Map<String, Object> searchAttributes = new HashMap<>();
+
+ workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+ .thenReturn(java.util.Optional.empty());
+
+ CommitStepWorkflow mockCommitWorkflow =
Mockito.mock(CommitStepWorkflow.class);
+
+ workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+ Mockito.eq(CommitStepWorkflow.class),
+ Mockito.any(ChildWorkflowOptions.class)))
+ .thenReturn(mockCommitWorkflow);
+
+ // Execute
+ CommitStepWorkflow result =
workflow.createCommitStepWorkflow(searchAttributes);
+
+ // Verify
+ Assert.assertNotNull(result);
+ }
+
+ /**
+ * Tests that NestingExecWorkflow is routed to execution queue when dynamic
scaling is enabled.
+ */
+ @Test
+ public void testCreateProcessingWorkflowWithDynamicScalingEnabled() {
+ // Setup
+ Map<String, Object> searchAttributes = new HashMap<>();
+ WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class);
+
+ Config mockConfig = ConfigFactory.parseString(
+ GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED + "=true\n" +
+ GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE +
"=TestExecutionQueue"
+ );
+
+ workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+ .thenReturn(java.util.Optional.of(mockConfig));
+
+ NestingExecWorkflow mockNestingWorkflow =
Mockito.mock(NestingExecWorkflow.class);
+
+ // Capture the ChildWorkflowOptions passed to newChildWorkflowStub
+ workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+ Mockito.eq(NestingExecWorkflow.class),
+ Mockito.any(ChildWorkflowOptions.class)))
+ .thenAnswer(invocation -> {
+ ChildWorkflowOptions options = invocation.getArgument(1);
+ // Verify task queue IS set to execution queue
+ Assert.assertEquals(options.getTaskQueue(), "TestExecutionQueue",
+ "NestingExecWorkflow should use execution queue when dynamic
scaling is enabled");
+ return mockNestingWorkflow;
+ });
+
+ // Execute
+ NestingExecWorkflow result = workflow.createProcessingWorkflow(mockSpec,
searchAttributes);
+
+ // Verify
+ Assert.assertNotNull(result);
+ workflowMockedStatic.verify(() -> Workflow.newChildWorkflowStub(
+ Mockito.eq(NestingExecWorkflow.class),
+ Mockito.any(ChildWorkflowOptions.class)), Mockito.times(1));
+ }
+
+ /**
+ * Tests that NestingExecWorkflow does NOT have task queue set when dynamic
scaling is disabled.
+ */
+ @Test
+ public void testCreateProcessingWorkflowWithDynamicScalingDisabled() {
+ // Setup
+ Map<String, Object> searchAttributes = new HashMap<>();
+ WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class);
+
+ Config mockConfig = ConfigFactory.parseString(
+ GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED + "=false"
+ );
+
+ workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+ .thenReturn(java.util.Optional.of(mockConfig));
+
+ NestingExecWorkflow mockNestingWorkflow =
Mockito.mock(NestingExecWorkflow.class);
+
+ workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+ Mockito.eq(NestingExecWorkflow.class),
+ Mockito.any(ChildWorkflowOptions.class)))
+ .thenAnswer(invocation -> {
+ ChildWorkflowOptions options = invocation.getArgument(1);
+ // Verify task queue is NOT set (inherits from parent)
+ Assert.assertNull(options.getTaskQueue(),
+ "NestingExecWorkflow should not have explicit task queue when
dynamic scaling is disabled");
+ return mockNestingWorkflow;
+ });
+
+ // Execute
+ NestingExecWorkflow result = workflow.createProcessingWorkflow(mockSpec,
searchAttributes);
+
+ // Verify
+ Assert.assertNotNull(result);
+ }
+
+ /**
+ * Tests that NestingExecWorkflow does NOT have task queue set when dynamic
scaling config is absent.
+ */
+ @Test
+ public void testCreateProcessingWorkflowWithoutDynamicScalingConfig() {
+ // Setup
+ Map<String, Object> searchAttributes = new HashMap<>();
+ WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class);
+
+ Config mockConfig = ConfigFactory.empty();
+
+ workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+ .thenReturn(java.util.Optional.of(mockConfig));
+
+ NestingExecWorkflow mockNestingWorkflow =
Mockito.mock(NestingExecWorkflow.class);
+
+ workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+ Mockito.eq(NestingExecWorkflow.class),
+ Mockito.any(ChildWorkflowOptions.class)))
+ .thenAnswer(invocation -> {
+ ChildWorkflowOptions options = invocation.getArgument(1);
+ // Verify task queue is NOT set (inherits from parent)
+ Assert.assertNull(options.getTaskQueue(),
+ "NestingExecWorkflow should not have explicit task queue when
dynamic scaling config is absent");
+ return mockNestingWorkflow;
+ });
+
+ // Execute
+ workflow.createProcessingWorkflow(mockSpec, searchAttributes);
+ }
+}