This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e9146ee4e [GOBBLIN-2245] Independent Dynamic Scaling for different 
Activities in Temporal WorkFlow (#4159)
8e9146ee4e is described below

commit 8e9146ee4e19daee303606d33007a81d4f961e73
Author: Agam Pal Singh <[email protected]>
AuthorDate: Thu Jan 15 11:41:40 2026 +0530

    [GOBBLIN-2245] Independent Dynamic Scaling for different Activities in 
Temporal WorkFlow (#4159)
    
    * independent memory and container configuration for temporal stages
    
    * route non execute activities to default queue
    
    * code refactoring
    
    * code cleanup
    
    * fix: spin up 1 container initially by default
    
    * proper queue routing for commit phase
    
    * code cleanup
    
    * - spin up initial container with execution worker
    - unit tests
    
    * - only forward NestingExecWorkflow to execution queue
    
    * removed redundant unit tests
    
    * removed redundant unit tests
    
    * copilot review fixes
    
    * PR review fixes
    
    * undo whitespace change
    
    * PR review comments addressing
    
    * PR review comments addressing
    
    * updated property name
    
    * removed unused import
    
    ---------
    
    Co-authored-by: Agam Pal Singh <[email protected]>
---
 .../temporal/GobblinTemporalConfigurationKeys.java |  22 ++
 .../temporal/cluster/AbstractTemporalWorker.java   |  14 +-
 .../cluster/GobblinTemporalTaskRunner.java         |  37 ++-
 .../AbstractRecommendScalingForWorkUnitsImpl.java  |  32 ++-
 .../temporal/ddm/worker/ExecutionWorker.java       | 107 ++++++++
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |  28 +-
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  28 +-
 .../cluster/GobblinTemporalTaskRunnerTest.java     | 147 +++++++++++
 ...stractRecommendScalingForWorkUnitsImplTest.java | 284 +++++++++++++++++++++
 .../temporal/ddm/worker/ExecutionWorkerTest.java   | 163 ++++++++++++
 .../impl/ProcessWorkUnitsWorkflowImplTest.java     | 255 ++++++++++++++++++
 11 files changed, 1092 insertions(+), 25 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 0b95b78ef4..bc87fe1f55 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.temporal;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.temporal.ddm.worker.ExecutionWorker;
 import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldJobLauncher;
 import org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker;
 
@@ -29,14 +30,20 @@ import 
org.apache.gobblin.temporal.workflows.helloworld.HelloWorldWorker;
 public interface GobblinTemporalConfigurationKeys {
 
   String PREFIX = "gobblin.temporal.";
+  String STAGE_SPECIFIC_PREFIX = PREFIX + "stage.";
 
   String WORKER_CLASS = PREFIX + "worker.class";
   String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName();
+  String EXECUTION_WORKER_CLASS = ExecutionWorker.class.getName();
   String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
   String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace";
 
   String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name";
   String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";
+
+  // Execution task queue for work execution specialization
+  String EXECUTION_TASK_QUEUE = PREFIX + "execution.task.queue.name";
+  String DEFAULT_EXECUTION_TASK_QUEUE = "GobblinTemporalExecutionQueue";
   String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher.";
   String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class";
   String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = 
HelloWorldJobLauncher.class.getName();
@@ -71,6 +78,17 @@ public interface GobblinTemporalConfigurationKeys {
   int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
   String TEMPORAL_NUM_THREADS_PER_WORKER = PREFIX + "num.threads.per.worker";
   int DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER = 15;
+  String TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER = PREFIX + 
"num.threads.per.execution.worker";
+
+  // Concurrency configs for WorkFulfillmentWorker
+  String TEMPORAL_MAX_CONCURRENT_ACTIVITY_SIZE = PREFIX + 
"max.concurrent.activity.size";
+  String TEMPORAL_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE = PREFIX + 
"max.concurrent.local.activity.size";
+  String TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASK_SIZE = PREFIX + 
"max.concurrent.workflow.task.size";
+
+  // Concurrency configs for ExecutionWorker
+  String TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE = PREFIX + 
"execution.max.concurrent.activity.size";
+  String TEMPORAL_EXECUTION_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE = PREFIX + 
"execution.max.concurrent.local.activity.size";
+  String TEMPORAL_EXECUTION_MAX_CONCURRENT_WORKFLOW_TASK_SIZE = PREFIX + 
"execution.max.concurrent.workflow.task.size";
 
   // Configuration key for setting the amortized throughput per worker thread 
per minute
   String TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE = PREFIX + 
"worker.thread.amortized.throughput.per.minute";
@@ -136,4 +154,8 @@ public interface GobblinTemporalConfigurationKeys {
   String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
   int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;
 
+  /**
+   * Memory allocation for execution worker containers.
+   */
+  String WORK_EXECUTION_MEMORY_MB = STAGE_SPECIFIC_PREFIX + 
"workExecution.memory.mb";
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
index 8ab428c418..b569417f0b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java
@@ -33,16 +33,12 @@ import org.apache.gobblin.util.ConfigUtils;
 /** Basic boilerplate for a {@link TemporalWorker} to register its activity 
and workflow capabilities and listen on a particular queue */
 public abstract class AbstractTemporalWorker implements TemporalWorker {
     private final WorkflowClient workflowClient;
-    private final String queueName;
     private final WorkerFactory workerFactory;
-    private final Config config;
+    protected final Config config;
 
     public AbstractTemporalWorker(Config cfg, WorkflowClient client) {
         config = cfg;
         workflowClient = client;
-        queueName = ConfigUtils.getString(cfg,
-            GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
-            
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
 
         // Create a Worker factory that can be used to create Workers that 
poll specific Task Queues.
         workerFactory = WorkerFactory.newInstance(workflowClient);
@@ -52,7 +48,7 @@ public abstract class AbstractTemporalWorker implements 
TemporalWorker {
 
     @Override
     public void start() {
-        Worker worker = workerFactory.newWorker(queueName, 
createWorkerOptions());
+        Worker worker = workerFactory.newWorker(getTaskQueue(), 
createWorkerOptions());
         // This Worker hosts both Workflow and Activity implementations.
         // Workflows are stateful, so you need to supply a type to create 
instances.
         worker.registerWorkflowImplementationTypes(getWorkflowImplClasses());
@@ -77,6 +73,12 @@ public abstract class AbstractTemporalWorker implements 
TemporalWorker {
     /** @return activity instances; NOTE: activities must be stateless and 
thread-safe, so a shared instance is used. */
     protected abstract Object[] getActivityImplInstances();
 
+    protected String getTaskQueue() {
+        return ConfigUtils.getString(config,
+            GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_TASK_QUEUE,
+            
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
+    }
+
     private final void stashWorkerConfig(Config cfg) {
         // stash to associate with...
         WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index e7cd4315f0..b7a609c51f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -238,6 +238,7 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
       for (int i = 0; i < this.numTemporalWorkers; i++) {
         workers.add(initiateWorker());
       }
+      initializeExecutionWorkers();
     }catch (Exception e) {
       logger.info(e + " for initiate workers");
       throw new RuntimeException(e);
@@ -252,8 +253,8 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
     WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
         managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
 
-    String workerClassName = ConfigUtils.getString(clusterConfig,
-        GobblinTemporalConfigurationKeys.WORKER_CLASS, 
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+    String workerClassName = ConfigUtils.getString(clusterConfig, 
GobblinTemporalConfigurationKeys.WORKER_CLASS,
+        GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
     logger.info("Creating worker - class: '{}'", workerClassName);
     Config workerConfig = clusterConfig;
     TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
@@ -263,6 +264,38 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
     return worker;
   }
 
+  private void initializeExecutionWorkers() throws Exception {
+    boolean dynamicScalingEnabled = ConfigUtils.getBoolean(clusterConfig,
+        GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false);
+
+    if (!dynamicScalingEnabled) {
+      return;
+    }
+
+    String workerClassName = ConfigUtils.getString(clusterConfig, 
GobblinTemporalConfigurationKeys.WORKER_CLASS,
+        GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
+    boolean isExecutionWorkerContainer = 
GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS.equals(workerClassName);
+
+    // only the initial container (WorkFulfillment worker) should start an 
additional ExecutionWorker worker
+    if (isExecutionWorkerContainer) {
+      return;
+    }
+
+    logger.info("Starting additional ExecutionWorker in initial container");
+
+    String namespace = ConfigUtils.getString(clusterConfig, 
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
+        GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
+    WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
+        managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
+
+    TemporalWorker executionWorker = 
GobblinConstructorUtils.invokeLongestConstructor(
+        
(Class<TemporalWorker>)Class.forName(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS),
+        clusterConfig, client);
+    executionWorker.start();
+    workers.add(executionWorker);
+    logger.info("Worker started for class: {}", 
GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS);
+  }
+
   private void initMetricReporter() {
     if (this.containerMetrics.isPresent()) {
       try {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
index a0d3fd11e5..9390e66f48 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.temporal.ddm.activity.impl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -25,6 +26,7 @@ import java.util.Properties;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
 import org.apache.gobblin.temporal.ddm.work.TimeBudget;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
@@ -32,6 +34,7 @@ import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
 import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
 import org.apache.gobblin.temporal.dynamic.ScalingDirective;
 import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
 
 /**
@@ -62,8 +65,9 @@ public abstract class 
AbstractRecommendScalingForWorkUnitsImpl implements Recomm
   protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary 
remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);
 
   protected ProfileDerivation calcProfileDerivation(String basisProfileName, 
WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
-    // TODO: implement right-sizing!!! (for now just return unchanged)
-    return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
+    // Create overlay with execution-specific memory and worker class
+    ProfileOverlay overlay = createExecutionWorkerOverlay(jobState);
+    return new ProfileDerivation(basisProfileName, overlay);
   }
 
   protected String calcProfileDerivationName(JobState jobState) {
@@ -72,6 +76,28 @@ public abstract class 
AbstractRecommendScalingForWorkUnitsImpl implements Recomm
   }
 
   protected String calcBasisProfileName(JobState jobState) {
-    return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
+    // Always derive from the global baseline
+    return WorkforceProfiles.BASELINE_NAME;
   }
+
+  private ProfileOverlay createExecutionWorkerOverlay(JobState jobState) {
+    List<ProfileOverlay.KVPair> overlayPairs = new ArrayList<>();
+
+    // Add execution-specific memory if configured (overrides baseline memory)
+    if 
(jobState.contains(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)) {
+      overlayPairs.add(new ProfileOverlay.KVPair(
+          GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
+          
jobState.getProp(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB)
+      ));
+    }
+
+    // Add ExecutionWorker class to ensure correct task queue routing
+    overlayPairs.add(new ProfileOverlay.KVPair(
+        GobblinTemporalConfigurationKeys.WORKER_CLASS,
+        GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS
+    ));
+
+    return new ProfileOverlay.Adding(overlayPairs);
+  }
+
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java
new file mode 100644
index 0000000000..452d23b1ad
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorker.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.worker;
+
+import java.util.concurrent.TimeUnit;
+
+import com.typesafe.config.Config;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.WorkerOptions;
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
+import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
+import 
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Specialized worker for Work Execution stage.
+ * This worker only registers activities for:
+ * - ProcessWorkUnit (Work Execution)
+ *
+ * Runs on containers with stage-specific memory for work execution operations.
+ * Polls the execution task queue to ensure activities run on 
appropriately-sized containers.
+ */
+public class ExecutionWorker extends AbstractTemporalWorker {
+    public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120;
+    @Getter(AccessLevel.PACKAGE)
+    private final int maxConcurrentActivityExecutionSize;
+    @Getter(AccessLevel.PACKAGE)
+    private final int maxConcurrentLocalActivityExecutionSize;
+    @Getter(AccessLevel.PACKAGE)
+    private final int maxConcurrentWorkflowTaskExecutionSize;
+
+    public ExecutionWorker(Config config, WorkflowClient workflowClient) {
+        super(config, workflowClient);
+        int defaultThreadsPerWorker = 
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER;
+
+        // Fallback chain: TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER -> 
TEMPORAL_NUM_THREADS_PER_WORKER -> DEFAULT
+        int executionWorkerThreads = ConfigUtils.getInt(config,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER,
+            ConfigUtils.getInt(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER, 
defaultThreadsPerWorker));
+
+        this.maxConcurrentActivityExecutionSize = ConfigUtils.getInt(config,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE,
+            executionWorkerThreads);
+        this.maxConcurrentLocalActivityExecutionSize = 
ConfigUtils.getInt(config,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE,
+            executionWorkerThreads);
+        this.maxConcurrentWorkflowTaskExecutionSize = 
ConfigUtils.getInt(config,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_WORKFLOW_TASK_SIZE,
+            executionWorkerThreads);
+    }
+
+    @Override
+    protected Class<?>[] getWorkflowImplClasses() {
+        return new Class[] {
+            ProcessWorkUnitsWorkflowImpl.class,
+            NestingExecOfProcessWorkUnitWorkflowImpl.class
+        };
+    }
+
+    @Override
+    protected Object[] getActivityImplInstances() {
+        return new Object[] {
+            new ProcessWorkUnitImpl()
+        };
+    }
+
+    @Override
+    protected WorkerOptions createWorkerOptions() {
+        return WorkerOptions.newBuilder()
+            
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
+            
.setMaxConcurrentActivityExecutionSize(this.maxConcurrentActivityExecutionSize)
+            
.setMaxConcurrentLocalActivityExecutionSize(this.maxConcurrentLocalActivityExecutionSize)
+            
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxConcurrentWorkflowTaskExecutionSize)
+            .build();
+    }
+
+    @Override
+    protected String getTaskQueue() {
+        return ConfigUtils.getString(
+            config,
+            GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE,
+            GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE
+        );
+    }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 3ee45d20eb..969aab7490 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -44,12 +44,28 @@ import org.apache.gobblin.util.ConfigUtils;
 /** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
 public class WorkFulfillmentWorker extends AbstractTemporalWorker {
     public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; // 
TODO: make configurable!
-    public int maxExecutionConcurrency;
+    private final int maxConcurrentActivityExecutionSize;
+    private final int maxConcurrentLocalActivityExecutionSize;
+    private final int maxConcurrentWorkflowTaskExecutionSize;
 
     public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) 
{
         super(config, workflowClient);
-        this.maxExecutionConcurrency = ConfigUtils.getInt(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
-            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER);
+        int defaultThreadsPerWorker = 
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER;
+
+        // Fallback chain: TEMPORAL_NUM_THREADS_PER_WORKER -> DEFAULT
+        int workerThreads = ConfigUtils.getInt(config,
+            GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+            defaultThreadsPerWorker);
+
+        this.maxConcurrentActivityExecutionSize = ConfigUtils.getInt(config,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_MAX_CONCURRENT_ACTIVITY_SIZE,
+            workerThreads);
+        this.maxConcurrentLocalActivityExecutionSize = 
ConfigUtils.getInt(config,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE,
+            workerThreads);
+        this.maxConcurrentWorkflowTaskExecutionSize = 
ConfigUtils.getInt(config,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASK_SIZE,
+            workerThreads);
     }
 
     @Override
@@ -69,9 +85,9 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
         return WorkerOptions.newBuilder()
             // default is only 1s - WAY TOO SHORT for 
`o.a.hadoop.fs.FileSystem#listStatus`!
             
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
-            
.setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency)
-            
.setMaxConcurrentLocalActivityExecutionSize(this.maxExecutionConcurrency)
-            
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxExecutionConcurrency)
+            
.setMaxConcurrentActivityExecutionSize(this.maxConcurrentActivityExecutionSize)
+            
.setMaxConcurrentLocalActivityExecutionSize(this.maxConcurrentLocalActivityExecutionSize)
+            
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxConcurrentWorkflowTaskExecutionSize)
             .build();
     }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 94e870c283..3fb309a927 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
 
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -32,6 +33,7 @@ import io.temporal.workflow.Workflow;
 
 import org.apache.gobblin.metrics.opentelemetry.GobblinOpenTelemetryMetrics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.activity.ActivityType;
@@ -174,25 +176,35 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
 
   protected NestingExecWorkflow<WorkUnitClaimCheck> 
createProcessingWorkflow(FileSystemJobStateful f,
       Map<String, Object> searchAttributes) {
-    ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+    Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
+    boolean dynamicScalingEnabled = 
config.hasPath(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED)
+        && 
config.getBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED);
+
+    ChildWorkflowOptions.Builder childOpts = ChildWorkflowOptions.newBuilder()
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE)
         .setSearchAttributes(searchAttributes)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f,
-            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
-        .build();
-    // TODO: to incorporate multiple different concrete `NestingExecWorkflow` 
sub-workflows in the same super-workflow... shall we use queues?!?!?
-    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts);
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(CHILD_WORKFLOW_ID_BASE, f, 
config));
+
+    // Route NestingExecWorkflow (work execution) to execution
+    if (dynamicScalingEnabled) {
+      
childOpts.setTaskQueue(config.hasPath(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+          ? 
config.getString(GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE)
+          : GobblinTemporalConfigurationKeys.DEFAULT_EXECUTION_TASK_QUEUE);
+    }
+
+    return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, 
childOpts.build());
   }
 
   protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object> 
searchAttributes) {
+    Config config = WorkerConfig.of(this).orElse(ConfigFactory.empty());
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
         // TODO: verify to instead use:  Policy.PARENT_CLOSE_POLICY_TERMINATE)
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
         .setSearchAttributes(searchAttributes)
-        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
-            WorkerConfig.of(this).orElse(ConfigFactory.empty())))
+        
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
 config))
         .build();
 
+    // CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow 
parent
     return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts);
   }
 }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java
new file mode 100644
index 0000000000..69772116d2
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunnerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import 
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
+
+
+/**
+ * Tests for {@link GobblinTemporalTaskRunner} worker initialization logic.
+ */
+public class GobblinTemporalTaskRunnerTest {
+
+  /**
+   * Tests that initializeExecutionWorkers does nothing when dynamic scaling 
is disabled.
+   */
+  @Test
+  public void testInitializeExecutionWorkersWhenDynamicScalingDisabled() 
throws Exception {
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, 
ConfigValueFactory.fromAnyRef(false))
+        .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS,
+            
ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS));
+
+    GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config);
+    List<TemporalWorker> workers = getWorkersField(taskRunner);
+    int initialWorkerCount = workers.size();
+
+    invokeInitializeExecutionWorkers(taskRunner);
+
+    Assert.assertEquals(workers.size(), initialWorkerCount,
+        "No workers should be added when dynamic scaling is disabled");
+  }
+
+  /**
+   * Tests that initializeExecutionWorkers does nothing when container is 
already ExecutionWorker.
+   */
+  @Test
+  public void testInitializeExecutionWorkersWhenAlreadyExecutionWorker() 
throws Exception {
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, 
ConfigValueFactory.fromAnyRef(true))
+        .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS,
+            
ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS));
+
+    GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config);
+    List<TemporalWorker> workers = getWorkersField(taskRunner);
+    int initialWorkerCount = workers.size();
+
+    invokeInitializeExecutionWorkers(taskRunner);
+
+    Assert.assertEquals(workers.size(), initialWorkerCount,
+        "No workers should be added when container is already 
ExecutionWorker");
+  }
+
+  /**
+   * Tests that initializeExecutionWorkers does nothing when dynamic scaling 
config is missing.
+   */
+  @Test
+  public void testInitializeExecutionWorkersWhenConfigMissing() throws 
Exception {
+    Config config = ConfigFactory.empty()
+        .withValue(GobblinTemporalConfigurationKeys.WORKER_CLASS,
+            
ConfigValueFactory.fromAnyRef(GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS));
+
+    GobblinTemporalTaskRunner taskRunner = createMockTaskRunner(config);
+    List<TemporalWorker> workers = getWorkersField(taskRunner);
+    int initialWorkerCount = workers.size();
+
+    invokeInitializeExecutionWorkers(taskRunner);
+
+    Assert.assertEquals(workers.size(), initialWorkerCount,
+        "No workers should be added when dynamic scaling config is missing");
+  }
+
+  /**
+   * Helper to create a mock GobblinTemporalTaskRunner with necessary fields 
set.
+   */
+  private GobblinTemporalTaskRunner createMockTaskRunner(Config config) throws 
Exception {
+    GobblinTemporalTaskRunner taskRunner = 
Mockito.mock(GobblinTemporalTaskRunner.class, Mockito.CALLS_REAL_METHODS);
+
+    // Set clusterConfig field
+    Field clusterConfigField = 
GobblinTemporalTaskRunner.class.getDeclaredField("clusterConfig");
+    clusterConfigField.setAccessible(true);
+    clusterConfigField.set(taskRunner, config);
+
+    // Set workers list
+    Field workersField = 
GobblinTemporalTaskRunner.class.getDeclaredField("workers");
+    workersField.setAccessible(true);
+    workersField.set(taskRunner, new ArrayList<TemporalWorker>());
+
+    // Mock managedWorkflowServiceStubs
+    ManagedWorkflowServiceStubs mockStubs = 
Mockito.mock(ManagedWorkflowServiceStubs.class);
+    Field stubsField = 
GobblinTemporalTaskRunner.class.getDeclaredField("managedWorkflowServiceStubs");
+    stubsField.setAccessible(true);
+    stubsField.set(taskRunner, mockStubs);
+
+    // Mock WorkflowClient
+    Mockito.when(mockStubs.getWorkflowServiceStubs()).thenReturn(null);
+
+    return taskRunner;
+  }
+
+  /**
+   * Helper to invoke the private initializeExecutionWorkers method using 
reflection.
+   */
+  private void invokeInitializeExecutionWorkers(GobblinTemporalTaskRunner 
taskRunner) throws Exception {
+    Method method = 
GobblinTemporalTaskRunner.class.getDeclaredMethod("initializeExecutionWorkers");
+    method.setAccessible(true);
+    method.invoke(taskRunner);
+  }
+
+  /**
+   * Helper to get the workers list field using reflection.
+   */
+  @SuppressWarnings("unchecked")
+  private List<TemporalWorker> getWorkersField(GobblinTemporalTaskRunner 
taskRunner) throws Exception {
+    Field workersField = 
GobblinTemporalTaskRunner.class.getDeclaredField("workers");
+    workersField.setAccessible(true);
+    return (List<TemporalWorker>) workersField.get(taskRunner);
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java
new file mode 100644
index 0000000000..b8cccddb70
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImplTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+
+/**
+ * Tests for {@link AbstractRecommendScalingForWorkUnitsImpl} focusing on 
profile overlay creation
+ * and ExecutionWorker configuration for dynamic scaling.
+ */
+public class AbstractRecommendScalingForWorkUnitsImplTest {
+
+  private TestableAbstractRecommendScalingForWorkUnitsImpl scalingImpl;
+  private Properties jobProps;
+  
+  @Mock
+  private WorkUnitsSizeSummary mockWorkSummary;
+  
+  @Mock
+  private TimeBudget mockTimeBudget;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    scalingImpl = new TestableAbstractRecommendScalingForWorkUnitsImpl();
+    jobProps = new Properties();
+    // JobState requires job.name and job.id
+    jobProps.setProperty("job.name", "TestJob");
+    jobProps.setProperty("job.id", "TestJob_123");
+  }
+
+  /**
+   * Tests that ExecutionWorker class is always added to profile overlay.
+   */
+  @Test
+  public void testProfileOverlayAlwaysIncludesExecutionWorkerClass() {
+    JobState jobState = new JobState(jobProps);
+    
+    ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+        WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+    Assert.assertNotNull(derivation);
+    Assert.assertEquals(derivation.getBasisProfileName(), 
WorkforceProfiles.BASELINE_NAME);
+    
+    ProfileOverlay overlay = derivation.getOverlay();
+    Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+    
+    // Verify ExecutionWorker class is in overlay
+    ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+    boolean hasExecutionWorkerClass = addingOverlay.getAdditionPairs().stream()
+        .anyMatch(kv -> 
kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS)
+            && 
kv.getValue().equals(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS));
+    
+    Assert.assertTrue(hasExecutionWorkerClass,
+        "Profile overlay should always include ExecutionWorker class");
+  }
+
+  /**
+   * Tests that execution-specific memory is added to overlay when configured.
+   */
+  @Test
+  public void testProfileOverlayIncludesExecutionMemoryWhenConfigured() {
+    String executionMemory = "32768";
+    
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, 
executionMemory);
+    JobState jobState = new JobState(jobProps);
+    
+    ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+        WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+    ProfileOverlay overlay = derivation.getOverlay();
+    Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+    
+    ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+    
+    // Verify memory is in overlay
+    boolean hasMemoryConfig = addingOverlay.getAdditionPairs().stream()
+        .anyMatch(kv -> 
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY)
+            && kv.getValue().equals(executionMemory));
+    
+    Assert.assertTrue(hasMemoryConfig,
+        "Profile overlay should include execution memory when configured");
+  }
+
+  /**
+   * Tests that memory is not added to overlay when not configured (falls back 
to baseline).
+   */
+  @Test
+  public void testProfileOverlayOmitsMemoryWhenNotConfigured() {
+    JobState jobState = new JobState(jobProps);
+    
+    ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+        WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+    ProfileOverlay overlay = derivation.getOverlay();
+    Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+    
+    ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+    
+    // Verify memory is NOT in overlay (will use baseline memory)
+    boolean hasMemoryConfig = addingOverlay.getAdditionPairs().stream()
+        .anyMatch(kv -> 
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY));
+    
+    Assert.assertFalse(hasMemoryConfig,
+        "Profile overlay should not include memory config when not set (uses 
baseline)");
+  }
+
+  /**
+   * Tests that overlay contains both ExecutionWorker class and memory when 
both configured.
+   */
+  @Test
+  public void testProfileOverlayIncludesBothWorkerClassAndMemory() {
+    String executionMemory = "65536";
+    
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, 
executionMemory);
+    JobState jobState = new JobState(jobProps);
+    
+    ProfileDerivation derivation = scalingImpl.calcProfileDerivation(
+        WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState);
+
+    ProfileOverlay overlay = derivation.getOverlay();
+    Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+    
+    ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+    List<ProfileOverlay.KVPair> kvPairs = addingOverlay.getAdditionPairs();
+    
+    // Should have exactly 2 entries: worker class + memory
+    Assert.assertEquals(kvPairs.size(), 2,
+        "Overlay should have 2 entries when memory is configured");
+    
+    boolean hasWorkerClass = kvPairs.stream()
+        .anyMatch(kv -> 
kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS));
+    boolean hasMemory = kvPairs.stream()
+        .anyMatch(kv -> 
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY));
+    
+    Assert.assertTrue(hasWorkerClass && hasMemory,
+        "Overlay should contain both worker class and memory");
+  }
+
+  /**
+   * Tests that profile derivation name uses default.
+   */
+  @Test
+  public void testProfileDerivationNameUsesDefault() {
+    JobState jobState = new JobState(jobProps);
+    
+    String derivationName = scalingImpl.calcProfileDerivationName(jobState);
+    
+    Assert.assertEquals(derivationName, 
AbstractRecommendScalingForWorkUnitsImpl.DEFAULT_PROFILE_DERIVATION_NAME);
+  }
+
+  /**
+   * Tests that basis profile name is always baseline.
+   */
+  @Test
+  public void testBasisProfileNameIsAlwaysBaseline() {
+    JobState jobState = new JobState(jobProps);
+    
+    String basisProfileName = scalingImpl.calcBasisProfileName(jobState);
+    
+    Assert.assertEquals(basisProfileName, WorkforceProfiles.BASELINE_NAME);
+  }
+
+  /**
+   * Tests that recommendScaling returns a ScalingDirective with correct 
profile derivation.
+   */
+  @Test
+  public void testRecommendScalingReturnsDirectiveWithProfileDerivation() {
+    String executionMemory = "16384";
+    
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, 
executionMemory);
+    
+    List<ScalingDirective> directives = scalingImpl.recommendScaling(
+        mockWorkSummary, "TestSource", mockTimeBudget, jobProps);
+
+    Assert.assertNotNull(directives);
+    Assert.assertEquals(directives.size(), 1);
+    
+    ScalingDirective directive = directives.get(0);
+    Assert.assertTrue(directive.getOptDerivedFrom().isPresent(),
+        "Directive should have profile derivation");
+    
+    ProfileDerivation derivation = directive.getOptDerivedFrom().get();
+    Assert.assertEquals(derivation.getBasisProfileName(), 
WorkforceProfiles.BASELINE_NAME);
+    
+    // Verify overlay has ExecutionWorker class
+    ProfileOverlay overlay = derivation.getOverlay();
+    Assert.assertTrue(overlay instanceof ProfileOverlay.Adding);
+    
+    ProfileOverlay.Adding addingOverlay = (ProfileOverlay.Adding) overlay;
+    boolean hasExecutionWorker = addingOverlay.getAdditionPairs().stream()
+        .anyMatch(kv -> 
kv.getKey().equals(GobblinTemporalConfigurationKeys.WORKER_CLASS)
+            && 
kv.getValue().equals(GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS));
+    
+    Assert.assertTrue(hasExecutionWorker,
+        "Scaling directive should include ExecutionWorker in profile 
derivation");
+  }
+
+  /**
+   * Tests that different memory values create different overlays.
+   */
+  @Test
+  public void testDifferentMemoryValuesCreateDifferentOverlays() {
+    // First config with 16GB
+    
jobProps.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB, 
"16384");
+    JobState jobState1 = new JobState(jobProps);
+    ProfileDerivation derivation1 = scalingImpl.calcProfileDerivation(
+        WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState1);
+
+    // Second config with 32GB
+    Properties jobProps2 = new Properties();
+    jobProps2.setProperty("job.name", "TestJob");
+    jobProps2.setProperty("job.id", "TestJob_456");
+    
jobProps2.setProperty(GobblinTemporalConfigurationKeys.WORK_EXECUTION_MEMORY_MB,
 "32768");
+    JobState jobState2 = new JobState(jobProps2);
+    ProfileDerivation derivation2 = scalingImpl.calcProfileDerivation(
+        WorkforceProfiles.BASELINE_NAME, null, "TestSource", jobState2);
+
+    // Extract memory values from overlays
+    ProfileOverlay.Adding overlay1 = (ProfileOverlay.Adding) 
derivation1.getOverlay();
+    ProfileOverlay.Adding overlay2 = (ProfileOverlay.Adding) 
derivation2.getOverlay();
+    
+    String memory1 = overlay1.getAdditionPairs().stream()
+        .filter(kv -> 
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY))
+        .map(kv -> kv.getValue())
+        .findFirst()
+        .orElse(null);
+    
+    String memory2 = overlay2.getAdditionPairs().stream()
+        .filter(kv -> 
kv.getKey().equals(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY))
+        .map(kv -> kv.getValue())
+        .findFirst()
+        .orElse(null);
+    
+    Assert.assertEquals(memory1, "16384");
+    Assert.assertEquals(memory2, "32768");
+    Assert.assertNotEquals(memory1, memory2,
+        "Different memory configs should produce different overlay values");
+  }
+
+  /**
+   * Testable concrete implementation of 
AbstractRecommendScalingForWorkUnitsImpl.
+   */
+  private static class TestableAbstractRecommendScalingForWorkUnitsImpl 
+      extends AbstractRecommendScalingForWorkUnitsImpl {
+    
+    @Override
+    protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, 
String sourceClass,
+        TimeBudget timeBudget, JobState jobState) {
+      // Simple test implementation: return 5 containers
+      return 5;
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java
new file mode 100644
index 0000000000..45c114bbcd
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/worker/ExecutionWorkerTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.worker;
+
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.WorkerFactory;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/**
+ * Tests for {@link ExecutionWorker} configuration logic.
+ */
+public class ExecutionWorkerTest {
+  
+  private MockedStatic<WorkerFactory> mockedWorkerFactory;
+  private WorkerFactory mockFactory;
+
+  @BeforeMethod
+  public void setUp() {
+    // Mock the static WorkerFactory.newInstance() method
+    mockFactory = Mockito.mock(WorkerFactory.class);
+    mockedWorkerFactory = Mockito.mockStatic(WorkerFactory.class);
+    mockedWorkerFactory.when(() -> 
WorkerFactory.newInstance(Mockito.any(WorkflowClient.class)))
+        .thenReturn(mockFactory);
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    if (mockedWorkerFactory != null) {
+      mockedWorkerFactory.close();
+    }
+  }
+
+  /**
+   * Tests that concurrency fields are initialized with custom values from 
config.
+   */
+  @Test
+  public void testConcurrencyFieldsWithCustomConfig() {
+    int customActivityConcurrency = 10;
+    int customLocalActivityConcurrency = 8;
+    int customWorkflowTaskConcurrency = 12;
+    
+    Config config = ConfigFactory.empty()
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE,
+            ConfigValueFactory.fromAnyRef(customActivityConcurrency))
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_LOCAL_ACTIVITY_SIZE,
+            ConfigValueFactory.fromAnyRef(customLocalActivityConcurrency))
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_WORKFLOW_TASK_SIZE,
+            ConfigValueFactory.fromAnyRef(customWorkflowTaskConcurrency));
+    
+    WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+    ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+    
+    Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(), 
customActivityConcurrency);
+    Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(), 
customLocalActivityConcurrency);
+    Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(), 
customWorkflowTaskConcurrency);
+  }
+
+  /**
+   * Tests that concurrency fields fall back to 
TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER.
+   */
+  @Test
+  public void testConcurrencyFieldsFallbackToExecutionWorkerThreads() {
+    int executionWorkerThreads = 20;
+    
+    Config config = ConfigFactory.empty()
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER,
+            ConfigValueFactory.fromAnyRef(executionWorkerThreads));
+    
+    WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+    ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+    
+    Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(), 
executionWorkerThreads);
+    Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(), 
executionWorkerThreads);
+    Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(), 
executionWorkerThreads);
+  }
+
+  /**
+   * Tests that concurrency fields fall back to 
TEMPORAL_NUM_THREADS_PER_WORKER.
+   */
+  @Test
+  public void testConcurrencyFieldsFallbackToWorkerThreads() {
+    int workerThreads = 25;
+    
+    Config config = ConfigFactory.empty()
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+            ConfigValueFactory.fromAnyRef(workerThreads));
+    
+    WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+    ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+    
+    Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(), 
workerThreads);
+    Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(), 
workerThreads);
+    Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(), 
workerThreads);
+  }
+
+  /**
+   * Tests that concurrency fields use default value when no config is set.
+   */
+  @Test
+  public void testConcurrencyFieldsWithDefaultValue() {
+    Config config = ConfigFactory.empty();
+    
+    WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+    ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+    
+    int defaultValue = 
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER;
+    Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(), 
defaultValue);
+    Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(), 
defaultValue);
+    Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(), 
defaultValue);
+  }
+
+  /**
+   * Tests that specific configs take precedence over base execution worker 
config.
+   */
+  @Test
+  public void testSpecificConfigTakesPrecedence() {
+    int executionWorkerThreads = 20;
+    int specificActivityConcurrency = 10;
+    
+    Config config = ConfigFactory.empty()
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER,
+            ConfigValueFactory.fromAnyRef(executionWorkerThreads))
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_EXECUTION_MAX_CONCURRENT_ACTIVITY_SIZE,
+            ConfigValueFactory.fromAnyRef(specificActivityConcurrency));
+    
+    WorkflowClient mockClient = Mockito.mock(WorkflowClient.class);
+    ExecutionWorker worker = new ExecutionWorker(config, mockClient);
+    
+    // Activity should use specific config
+    Assert.assertEquals(worker.getMaxConcurrentActivityExecutionSize(), 
specificActivityConcurrency);
+    // Others should fall back to execution worker threads
+    Assert.assertEquals(worker.getMaxConcurrentLocalActivityExecutionSize(), 
executionWorkerThreads);
+    Assert.assertEquals(worker.getMaxConcurrentWorkflowTaskExecutionSize(), 
executionWorkerThreads);
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java
new file mode 100644
index 0000000000..272aca43fd
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImplTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.workflow.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import io.temporal.workflow.ChildWorkflowOptions;
+import io.temporal.workflow.Workflow;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
+
+
+/**
+ * Tests for {@link ProcessWorkUnitsWorkflowImpl} focusing on task queue 
routing
+ * and child workflow creation.
+ */
+public class ProcessWorkUnitsWorkflowImplTest {
+
+  private MockedStatic<Workflow> workflowMockedStatic;
+  private MockedStatic<WorkerConfig> workerConfigMockedStatic;
+  private MockedStatic<Help> helpMockedStatic;
+  private ProcessWorkUnitsWorkflowImpl workflow;
+
+  @BeforeMethod
+  public void setup() {
+    workflowMockedStatic = Mockito.mockStatic(Workflow.class);
+    workerConfigMockedStatic = Mockito.mockStatic(WorkerConfig.class);
+    helpMockedStatic = Mockito.mockStatic(Help.class);
+
+    // Mock Help.qualifyNamePerExecWithFlowExecId to return a simple workflow 
ID
+    helpMockedStatic.when(() -> 
Help.qualifyNamePerExecWithFlowExecId(Mockito.anyString(), Mockito.any(), 
Mockito.any()))
+        .thenReturn("test-workflow-id");
+    helpMockedStatic.when(() -> 
Help.qualifyNamePerExecWithFlowExecId(Mockito.anyString(), Mockito.any()))
+        .thenReturn("test-workflow-id");
+
+    workflow = new ProcessWorkUnitsWorkflowImpl();
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    if (workflowMockedStatic != null) {
+      workflowMockedStatic.close();
+    }
+    if (workerConfigMockedStatic != null) {
+      workerConfigMockedStatic.close();
+    }
+    if (helpMockedStatic != null) {
+      helpMockedStatic.close();
+    }
+  }
+
+  /**
+   * Tests that CommitStepWorkflow child workflow is created without explicit 
task queue,
+   * allowing it to inherit the parent workflow's task queue (default queue).
+   * This ensures CommitStepWorkflow runs on WorkFulfillmentWorker, not 
ExecutionWorker.
+   */
+  @Test
+  public void testCreateCommitStepWorkflowUsesDefaultQueue() {
+    // Setup
+    Map<String, Object> searchAttributes = new HashMap<>();
+    searchAttributes.put("test", "value");
+
+    Config mockConfig = ConfigFactory.empty();
+    workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+        .thenReturn(java.util.Optional.of(mockConfig));
+
+    CommitStepWorkflow mockCommitWorkflow = 
Mockito.mock(CommitStepWorkflow.class);
+
+    // Capture the ChildWorkflowOptions passed to newChildWorkflowStub
+    workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+        Mockito.eq(CommitStepWorkflow.class),
+        Mockito.any(ChildWorkflowOptions.class)))
+        .thenAnswer(invocation -> {
+          ChildWorkflowOptions options = invocation.getArgument(1);
+          // Verify task queue is NOT set (should be null to inherit from 
parent)
+          Assert.assertNull(options.getTaskQueue(),
+              "CommitStepWorkflow should not have explicit task queue set");
+          Assert.assertEquals(options.getSearchAttributes(), searchAttributes);
+          return mockCommitWorkflow;
+        });
+
+    // Execute
+    CommitStepWorkflow result = 
workflow.createCommitStepWorkflow(searchAttributes);
+
+    // Verify
+    Assert.assertNotNull(result);
+    workflowMockedStatic.verify(() -> Workflow.newChildWorkflowStub(
+        Mockito.eq(CommitStepWorkflow.class),
+        Mockito.any(ChildWorkflowOptions.class)), Mockito.times(1));
+  }
+
+  /**
+   * Tests that CommitStepWorkflow creation works when WorkerConfig is absent.
+   */
+  @Test
+  public void testCreateCommitStepWorkflowWithoutWorkerConfig() {
+    // Setup
+    Map<String, Object> searchAttributes = new HashMap<>();
+
+    workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+        .thenReturn(java.util.Optional.empty());
+
+    CommitStepWorkflow mockCommitWorkflow = 
Mockito.mock(CommitStepWorkflow.class);
+
+    workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+        Mockito.eq(CommitStepWorkflow.class),
+        Mockito.any(ChildWorkflowOptions.class)))
+        .thenReturn(mockCommitWorkflow);
+
+    // Execute
+    CommitStepWorkflow result = 
workflow.createCommitStepWorkflow(searchAttributes);
+
+    // Verify
+    Assert.assertNotNull(result);
+  }
+
+  /**
+   * Tests that NestingExecWorkflow is routed to execution queue when dynamic 
scaling is enabled.
+   */
+  @Test
+  public void testCreateProcessingWorkflowWithDynamicScalingEnabled() {
+    // Setup
+    Map<String, Object> searchAttributes = new HashMap<>();
+    WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class);
+
+    Config mockConfig = ConfigFactory.parseString(
+        GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED + "=true\n" +
+        GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE + 
"=TestExecutionQueue"
+    );
+
+    workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+        .thenReturn(java.util.Optional.of(mockConfig));
+
+    NestingExecWorkflow mockNestingWorkflow = 
Mockito.mock(NestingExecWorkflow.class);
+
+    // Capture the ChildWorkflowOptions passed to newChildWorkflowStub
+    workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+        Mockito.eq(NestingExecWorkflow.class),
+        Mockito.any(ChildWorkflowOptions.class)))
+        .thenAnswer(invocation -> {
+          ChildWorkflowOptions options = invocation.getArgument(1);
+          // Verify task queue IS set to execution queue
+          Assert.assertEquals(options.getTaskQueue(), "TestExecutionQueue",
+              "NestingExecWorkflow should use execution queue when dynamic 
scaling is enabled");
+          return mockNestingWorkflow;
+        });
+
+    // Execute
+    NestingExecWorkflow result = workflow.createProcessingWorkflow(mockSpec, 
searchAttributes);
+
+    // Verify
+    Assert.assertNotNull(result);
+    workflowMockedStatic.verify(() -> Workflow.newChildWorkflowStub(
+        Mockito.eq(NestingExecWorkflow.class),
+        Mockito.any(ChildWorkflowOptions.class)), Mockito.times(1));
+  }
+
+  /**
+   * Tests that NestingExecWorkflow does NOT have task queue set when dynamic 
scaling is disabled.
+   */
+  @Test
+  public void testCreateProcessingWorkflowWithDynamicScalingDisabled() {
+    // Setup
+    Map<String, Object> searchAttributes = new HashMap<>();
+    WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class);
+
+    Config mockConfig = ConfigFactory.parseString(
+        GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED + "=false"
+    );
+
+    workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+        .thenReturn(java.util.Optional.of(mockConfig));
+
+    NestingExecWorkflow mockNestingWorkflow = 
Mockito.mock(NestingExecWorkflow.class);
+
+    workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+        Mockito.eq(NestingExecWorkflow.class),
+        Mockito.any(ChildWorkflowOptions.class)))
+        .thenAnswer(invocation -> {
+          ChildWorkflowOptions options = invocation.getArgument(1);
+          // Verify task queue is NOT set (inherits from parent)
+          Assert.assertNull(options.getTaskQueue(),
+              "NestingExecWorkflow should not have explicit task queue when 
dynamic scaling is disabled");
+          return mockNestingWorkflow;
+        });
+
+    // Execute
+    NestingExecWorkflow result = workflow.createProcessingWorkflow(mockSpec, 
searchAttributes);
+
+    // Verify
+    Assert.assertNotNull(result);
+  }
+
+  /**
+   * Tests that NestingExecWorkflow does NOT have task queue set when dynamic 
scaling config is absent.
+   */
+  @Test
+  public void testCreateProcessingWorkflowWithoutDynamicScalingConfig() {
+    // Setup
+    Map<String, Object> searchAttributes = new HashMap<>();
+    WUProcessingSpec mockSpec = Mockito.mock(WUProcessingSpec.class);
+
+    Config mockConfig = ConfigFactory.empty();
+
+    workerConfigMockedStatic.when(() -> WorkerConfig.of(Mockito.any()))
+        .thenReturn(java.util.Optional.of(mockConfig));
+
+    NestingExecWorkflow mockNestingWorkflow = 
Mockito.mock(NestingExecWorkflow.class);
+
+    workflowMockedStatic.when(() -> Workflow.newChildWorkflowStub(
+        Mockito.eq(NestingExecWorkflow.class),
+        Mockito.any(ChildWorkflowOptions.class)))
+        .thenAnswer(invocation -> {
+          ChildWorkflowOptions options = invocation.getArgument(1);
+          // Verify task queue is NOT set (inherits from parent)
+          Assert.assertNull(options.getTaskQueue(),
+              "NestingExecWorkflow should not have explicit task queue when 
dynamic scaling config is absent");
+          return mockNestingWorkflow;
+        });
+
+    // Execute
+    workflow.createProcessingWorkflow(mockSpec, searchAttributes);
+  }
+}

Reply via email to