This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f2bcdc7329 [GOBBLIN-2190] Implement ActivityType & add HeartBeat for 
Temporal Activities (#4093)
f2bcdc7329 is described below

commit f2bcdc7329dedd5a0d19fb923f60de278ed92d6c
Author: Vivek Rai <[email protected]>
AuthorDate: Tue Feb 25 14:19:41 2025 +0530

    [GOBBLIN-2190] Implement ActivityType & add HeartBeat for Temporal 
Activities (#4093)
    
    Made StartToClose timeout configurable and added heartbeat
---
 .../temporal/GobblinTemporalConfigurationKeys.java |  30 +++++
 .../temporal/ddm/activity/ActivityType.java        | 101 +++++++++++++++++
 .../ddm/activity/impl/CommitActivityImpl.java      |  16 +++
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |  15 +++
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     |  14 +++
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |   2 +-
 .../gobblin/temporal/ddm/util/JobStateUtils.java   |   6 +
 .../temporal/ddm/work/WUProcessingSpec.java        |   5 +-
 .../temporal/ddm/workflow/CommitStepWorkflow.java  |   4 +-
 .../ddm/workflow/ProcessWorkUnitsWorkflow.java     |   4 +-
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  |  22 +---
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |  61 +++-------
 .../NestingExecOfProcessWorkUnitWorkflowImpl.java  |  28 ++---
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  35 +++---
 .../launcher/GenArbitraryLoadJobLauncher.java      |   5 +-
 ...ExecOfIllustrationItemActivityWorkflowImpl.java |  27 ++---
 .../nesting/work/NestingExecWorkloadInput.java     |  43 +++++++
 .../workflow/AbstractNestingExecWorkflowImpl.java  |  34 +++---
 .../util/nesting/workflow/NestingExecWorkflow.java |  13 +--
 .../temporal/ddm/activity/ActivityTypeTest.java    | 124 +++++++++++++++++++++
 .../org/apache/gobblin/util/PropertiesUtils.java   |   8 ++
 .../apache/gobblin/util/PropertiesUtilsTest.java   |  16 +++
 22 files changed, 455 insertions(+), 158 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 82ea646090..f2ebcc31ea 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys {
   String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
   int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
   String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
+
+  /**
+   * Activities timeout configs
+   */
+  String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + 
"activity.heartbeat.timeout.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5;
+  String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + 
"activity.heartbeat.interval.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1;
+  String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 
"activity.starttoclose.timeout.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360;
+  String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient";
+  double DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;
+
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
new file mode 100644
index 0000000000..7c796ebe35
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be 
performed.
+ */
+public enum ActivityType {
+  /** Activity type for generating work units. */
+  
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for recommending scaling operations. */
+  
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for deleting work directories. */
+  
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for processing a work unit. */
+  
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for committing step. */
+  
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Default placeholder activity type. */
+  
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+  @Getter private final String startToCloseTimeoutConfigKey;
+
+  ActivityType(String startToCloseTimeoutConfigKey) {
+    this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+  }
+
+  public ActivityOptions buildActivityOptions(Properties props) {
+    return ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(props))
+        .setHeartbeatTimeout(getHeartbeatTimeout(props))
+        .setRetryOptions(buildRetryOptions(props))
+        .build();
+  }
+
+  private Duration getStartToCloseTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, 
this.startToCloseTimeoutConfigKey,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+  }
+
+  private Duration getHeartbeatTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+        
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+  }
+
+  private RetryOptions buildRetryOptions(Properties props) {
+    int maximumIntervalSeconds = PropertiesUtils.getPropAsInt(props,
+        
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS);
+
+    int initialIntervalSeconds = Math.min(PropertiesUtils.getPropAsInt(props,
+        
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS),
+        maximumIntervalSeconds);
+
+    return RetryOptions.newBuilder()
+        .setInitialInterval(Duration.ofSeconds(initialIntervalSeconds))
+        .setMaximumInterval(Duration.ofSeconds(maximumIntervalSeconds))
+        .setBackoffCoefficient(PropertiesUtils.getPropAsDouble(props,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT))
+        .setMaximumAttempts(PropertiesUtils.getPropAsInt(props,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS))
+        .build();
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index b43c079c10..b106cdc153 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -27,6 +27,9 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
@@ -41,6 +44,9 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.io.Closer;
+
+import io.temporal.activity.Activity;
+import io.temporal.activity.ActivityExecutionContext;
 import io.temporal.failure.ApplicationFailure;
 
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -85,12 +91,21 @@ public class CommitActivityImpl implements CommitActivity {
 
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
+    ActivityExecutionContext activityExecutionContext = 
Activity.getExecutionContext();
+    ScheduledExecutorService heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(
+        
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+            
com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor")));
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
     Optional<String> optJobName = Optional.empty();
     AutomaticTroubleshooter troubleshooter = null;
     try (FileSystem fs = Help.loadFileSystem(workSpec)) {
       JobState jobState = Help.loadJobState(workSpec, fs);
+
+      int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
+      heartBeatExecutor.scheduleAtFixedRate(() -> 
activityExecutionContext.heartbeat("Running Commit Activity"),
+          heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
+
       optJobName = Optional.ofNullable(jobState.getJobName());
       SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
       troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
@@ -147,6 +162,7 @@ public class CommitActivityImpl implements CommitActivity {
       String errCorrelator = String.format("Commit [%s]", 
calcCommitId(workSpec));
       EventSubmitter eventSubmitter = 
workSpec.getEventSubmitterContext().create();
       Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, 
errCorrelator);
+      ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, 
com.google.common.base.Optional.of(log));
     }
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 33f60bd779..fdeaf42487 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -24,6 +24,9 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +41,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.io.Closer;
 import com.tdunning.math.stats.TDigest;
 import io.temporal.failure.ApplicationFailure;
+import io.temporal.activity.Activity;
+import io.temporal.activity.ActivityExecutionContext;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -66,6 +71,7 @@ import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.writer.initializer.WriterInitializer;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
@@ -122,11 +128,19 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
 
   @Override
   public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
+    ActivityExecutionContext activityExecutionContext = 
Activity.getExecutionContext();
+    ScheduledExecutorService heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(
+        
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+            
com.google.common.base.Optional.of("GenerateWorkUnitsActivityHeartBeatExecutor")));
     // TODO: decide whether to acquire a job lock (as MR did)!
     // TODO: provide for job cancellation (unless handling at the 
temporal-level of parent workflows)!
     JobState jobState = new JobState(jobProps);
     log.info("Created jobState: {}", jobState.toJsonString(true));
 
+    int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
+    heartBeatExecutor.scheduleAtFixedRate(() -> 
activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
+        heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
+
     Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
     log.info("Using work dir root path for job '{}' - '{}'", 
jobState.getJobId(), workDirRoot);
 
@@ -177,6 +191,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     } finally {
       EventSubmitter eventSubmitter = eventSubmitterContext.create();
       Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, 
jobState.getJobId());
+      ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, 
com.google.common.base.Optional.of(log));
     }
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index a6753245b7..9450bc7bcb 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -28,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Lists;
+import io.temporal.activity.Activity;
+import io.temporal.activity.ActivityExecutionContext;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -56,6 +61,7 @@ import 
org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
 
 
@@ -68,6 +74,10 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
 
   @Override
   public int processWorkUnit(WorkUnitClaimCheck wu) {
+    ActivityExecutionContext activityExecutionContext = 
Activity.getExecutionContext();
+    ScheduledExecutorService heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor(
+        
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+            
com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor")));
     AutomaticTroubleshooter troubleshooter = null;
     EventSubmitter eventSubmitter = wu.getEventSubmitterContext().create();
     String correlator = String.format("(M)WU [%s]", wu.getCorrelator());
@@ -75,6 +85,9 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
       List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
       log.info("{} - loaded; found {} workUnits", correlator, 
workUnits.size());
       JobState jobState = Help.loadJobState(wu, fs);
+      int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
+      heartBeatExecutor.scheduleAtFixedRate(() -> 
activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity"),
+          heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
       troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
       troubleshooter.start();
       return execute(workUnits, wu, jobState, fs, 
troubleshooter.getIssueRepository());
@@ -82,6 +95,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
       throw new RuntimeException(e);
     } finally {
       Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, 
correlator);
+      ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, 
com.google.common.base.Optional.of(log));
     }
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 709a9cf935..954269948f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -112,7 +112,7 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
       Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, 
Help.loadFileSystem(wuSpec)));
 
       ProcessWorkUnitsWorkflow workflow = 
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
-      workflow.process(wuSpec);
+      workflow.process(wuSpec, jobProps);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index e606601956..be31d3ec4f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -46,6 +46,7 @@ import org.apache.gobblin.runtime.SourceDecorator;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.util.JobLauncherUtils;
@@ -238,4 +239,9 @@ public class JobStateUtils {
             GobblinScopeTypes.GLOBAL.defaultScopeInstance());
     return globalBroker.newSubscopedBuilder(new 
JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
   }
+
+  public static int getHeartBeatInterval(JobState jobState) {
+    return 
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES);
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index dfa207f66c..3a05e60f01 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
-import java.util.Optional;
 
 import org.apache.hadoop.fs.Path;
 
@@ -36,8 +35,6 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
-import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
-import org.apache.gobblin.temporal.util.nesting.work.Workload;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
@@ -75,7 +72,7 @@ public class WUProcessingSpec implements FileSystemApt, 
FileSystemJobStateful {
     return new Path(new Path(workUnitsDir).getParent(), 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
   }
 
-  /** Configuration for {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr,
 Workload, int, int, int, Optional)}*/
+  /** Configuration for {@link 
org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput} */
   @Data
   @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
   @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
index c536828520..e47c0a9b15 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.temporal.ddm.workflow;
 
+import java.util.Properties;
+
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
@@ -35,5 +37,5 @@ public interface CommitStepWorkflow {
      * @return number of workunits committed
      */
     @WorkflowMethod
-    CommitStats commit(WUProcessingSpec workSpec);
+    CommitStats commit(WUProcessingSpec workSpec, Properties props);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index a6018d41f1..8adfd34185 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.temporal.ddm.workflow;
 
+import java.util.Properties;
+
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
@@ -30,5 +32,5 @@ import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 public interface ProcessWorkUnitsWorkflow {
   /** @return the number of {@link WorkUnit}s cumulatively processed 
successfully */
   @WorkflowMethod
-  CommitStats process(WUProcessingSpec wuSpec);
+  CommitStats process(WUProcessingSpec workSpec, Properties props);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 0f28901883..b440658904 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -17,13 +17,11 @@
 
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
 import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.Workflow;
 
@@ -31,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.DatasetStats;
@@ -42,22 +41,9 @@ import 
org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 @Slf4j
 public class CommitStepWorkflowImpl implements CommitStepWorkflow {
 
-  private static final RetryOptions ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofHours(3)) // TODO: make 
configurable... also add activity heartbeats
-      .setRetryOptions(ACTIVITY_RETRY_OPTS)
-      .build();
-
-  private final CommitActivity activityStub = 
Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
-
   @Override
-  public CommitStats commit(WUProcessingSpec workSpec) {
+  public CommitStats commit(WUProcessingSpec workSpec, final Properties props) 
{
+    final CommitActivity activityStub = 
Workflow.newActivityStub(CommitActivity.class, 
ActivityType.COMMIT.buildActivityOptions(props));
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
     if (!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
       TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 15184c6ccc..661847bdce 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -39,9 +39,7 @@ import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import io.temporal.activity.ActivityOptions;
 import io.temporal.api.enums.v1.ParentClosePolicy;
-import io.temporal.common.RetryOptions;
 import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
@@ -50,6 +48,7 @@ import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
@@ -72,6 +71,7 @@ import 
org.apache.gobblin.temporal.dynamic.ScalingDirectivesRecipient;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
@@ -81,49 +81,6 @@ import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
   public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits";
 
-  public static final Duration genWUsStartToCloseTimeout = 
Duration.ofHours(2); // TODO: make configurable... also add activity heartbeats
-
-  private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(genWUsStartToCloseTimeout)
-      .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
-      .build();
-
-  private final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS);
-
-  private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofMinutes(5))
-      .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS)
-      .build();
-  private final RecommendScalingForWorkUnits recommendScalingStub = 
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
-      RECOMMEND_SCALING_ACTIVITY_OPTS);
-
-  private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofMinutes(10))
-      .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
-      .build();
-  private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = 
Workflow.newActivityStub(DeleteWorkDirsActivity.class, 
DELETE_WORK_DIRS_ACTIVITY_OPTS);
-
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
@@ -131,8 +88,13 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
     Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = 
Optional.empty();
     WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
+    // Filtering only temporal job properties to pass to child workflows to 
avoid passing unnecessary properties
+    final Properties temporalJobProps = 
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+        
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
     boolean isSuccessful = false;
     try (Closer closer = Closer.create()) {
+      final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class,
+          
ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps));
       GenerateWorkUnitsResult generateWorkUnitResult = 
genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
       optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
       WorkUnitsSizeSummary wuSizeSummary = 
generateWorkUnitResult.getWorkUnitsSizeSummary();
@@ -141,6 +103,8 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       CommitStats commitStats = CommitStats.createEmpty();
       if (numWUsGenerated > 0) {
         TimeBudget timeBudget = 
calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps);
+        final RecommendScalingForWorkUnits recommendScalingStub = 
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
+            
ActivityType.RECOMMEND_SCALING.buildActivityOptions(temporalJobProps));
         List<ScalingDirective> scalingDirectives =
             recommendScalingStub.recommendScaling(wuSizeSummary, 
generateWorkUnitResult.getSourceClass(), timeBudget, jobProps);
         log.info("Recommended scaling to process WUs within {}: {}", 
timeBudget, scalingDirectives);
@@ -156,7 +120,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
         }
 
         ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
-        commitStats = processWUsWorkflow.process(wuSpec);
+        commitStats = processWUsWorkflow.process(wuSpec, temporalJobProps);
         numWUsCommitted = commitStats.getNumCommittedWorkUnits();
       }
       jobSuccessTimer.stop();
@@ -270,6 +234,11 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       return;
     }
 
+    final DeleteWorkDirsActivity deleteWorkDirsActivityStub = 
Workflow.newActivityStub(
+        DeleteWorkDirsActivity.class,
+        
ActivityType.DELETE_WORK_DIRS.buildActivityOptions(jobState.getProperties())
+    );
+
     DirDeletionResult dirDeletionResult = 
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
         calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
index 96591e6547..6f22e3dd8b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
@@ -17,13 +17,13 @@
 
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
+import java.util.Properties;
+
 import io.temporal.workflow.Async;
 import io.temporal.workflow.Promise;
 import io.temporal.workflow.Workflow;
-import java.time.Duration;
 
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 import 
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;
@@ -31,25 +31,11 @@ import 
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWork
 
 /** {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for 
{@link ProcessWorkUnit} */
 public class NestingExecOfProcessWorkUnitWorkflowImpl extends 
AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> {
-  public static final Duration processWorkUnitStartToCloseTimeout = 
Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats
-
-  // RetryOptions specify how to automatically handle retries when Activities 
fail.
-  private static final RetryOptions ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout)
-      .setRetryOptions(ACTIVITY_RETRY_OPTS)
-      .build();
-
-  private final ProcessWorkUnit activityStub = 
Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS);
 
   @Override
-  protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) {
-    return Async.function(activityStub::processWorkUnit, wu);
+  protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, 
final Properties props) {
+    final ProcessWorkUnit processWorkUnitStub = 
Workflow.newActivityStub(ProcessWorkUnit.class,
+        ActivityType.PROCESS_WORKUNIT.buildActivityOptions(props));
+    return Async.function(processWorkUnitStub::processWorkUnit, wu);
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 7b3f171967..00def8a2ee 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -18,10 +18,10 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
 
-import com.google.common.io.Closer;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -33,6 +33,7 @@ import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
@@ -45,7 +46,6 @@ import 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
-import org.apache.gobblin.runtime.JobState;
 
 
 @Slf4j
@@ -54,37 +54,30 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   public static final String COMMIT_STEP_WORKFLOW_ID_BASE = 
"CommitStepWorkflow";
 
   @Override
-  public CommitStats process(WUProcessingSpec workSpec) {
+  public CommitStats process(WUProcessingSpec workSpec, final Properties 
props) {
     Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
-    CommitStats result = performWork(workSpec);
+    CommitStats result = performWork(workSpec, props);
     timer.ifPresent(EventTimer::stop);
     return result;
   }
 
-  private CommitStats performWork(WUProcessingSpec workSpec) {
+  private CommitStats performWork(WUProcessingSpec workSpec, final Properties 
props) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
-    Map<String, Object> searchAttributes;
-    JobState jobState;
-    try (Closer closer = Closer.create()) {
-      jobState = Help.loadJobState(workSpec, 
closer.register(Help.loadFileSystem(workSpec)));
-    } catch (Exception e) {
-      log.error("Error loading jobState", e);
-      throw new RuntimeException("Error loading jobState", e);
-    }
-    searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
-
+    Map<String, Object> searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(props);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec, searchAttributes);
 
     Optional<Integer> workunitsProcessed = Optional.empty();
     try {
-      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+      NestingExecWorkloadInput<WorkUnitClaimCheck>
+          performWorkloadInput = new 
NestingExecWorkloadInput<>(WorkflowAddr.ROOT, workload, 0,
           workSpec.getTuning().getMaxBranchesPerTree(), 
workSpec.getTuning().getMaxSubTreesPerTree(),
-          Optional.empty()));
+          Optional.empty(), props);
+      workunitsProcessed = 
Optional.of(processingWorkflow.performWorkload(performWorkloadInput));
     } catch (Exception e) {
       log.error("ProcessWorkUnits failure - attempting partial commit before 
re-throwing exception", e);
 
       try {
-        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);// Attempt partial commit before surfacing the failure
+        performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed, props);// Attempt partial commit before surfacing the 
failure
       } catch (Exception commitException) {
         // Combine current and commit exception messages for a more complete 
context
         String combinedMessage = String.format(
@@ -100,18 +93,18 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
       }
       throw e;// Re-throw after any partial commit, to fail the parent 
workflow in case commitWorkflow didn't flow (unlikely)
     }
-    return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed);
+    return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, 
workunitsProcessed, props);
   }
 
   private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec 
workSpec,
-      Map<String, Object> searchAttributes, Optional<Integer> 
workunitsProcessed) {
+      Map<String, Object> searchAttributes, Optional<Integer> 
workunitsProcessed, Properties props) {
     //  we are only inhibiting commit when workunitsProcessed is actually 
known to be zero
     if (workunitsProcessed.filter(n -> n == 0).isPresent()) {
       log.error("No work units processed, so no commit attempted.");
       return CommitStats.createEmpty();
     }
     CommitStepWorkflow commitWorkflow = 
createCommitStepWorkflow(searchAttributes);
-    CommitStats result = commitWorkflow.commit(workSpec);
+    CommitStats result = commitWorkflow.commit(workSpec, props);
     if (result.getNumCommittedWorkUnits() == 0) {
       log.warn("No work units committed at the job level. They could have been 
committed at the task level.");
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
index 86f1d4a092..5315f464f3 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
@@ -38,6 +38,7 @@ import 
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
 import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
 import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
@@ -86,6 +87,8 @@ public class GenArbitraryLoadJobLauncher extends 
GobblinTemporalJobLauncher {
     // WARNING: although type param must agree w/ that of `workload`, it's 
entirely unverified by type checker!
     // ...and more to the point, mismatch would occur at runtime 
(`performWorkload` on the workflow type given to the stub)!
     NestingExecWorkflow<IllustrationItem> workflow = 
this.client.newWorkflowStub(NestingExecWorkflow.class, options);
-    workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, 
maxBranchesPerTree, maxSubTreesPerTree, Optional.empty());
+    NestingExecWorkloadInput<IllustrationItem> performWorkloadInput = new 
NestingExecWorkloadInput<>(WorkflowAddr.ROOT,
+        workload, 0, maxBranchesPerTree, maxSubTreesPerTree, Optional.empty(), 
new Properties());
+    workflow.performWorkload(performWorkloadInput);
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
index 4346eecfdf..5355d9442d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
@@ -17,12 +17,13 @@
 
 package org.apache.gobblin.temporal.loadgen.workflow.impl;
 
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
+import java.util.Properties;
+
 import io.temporal.workflow.Async;
 import io.temporal.workflow.Promise;
 import io.temporal.workflow.Workflow;
-import java.time.Duration;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity;
 import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
 import 
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;
@@ -32,24 +33,10 @@ import 
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWork
 public class NestingExecOfIllustrationItemActivityWorkflowImpl
     extends AbstractNestingExecWorkflowImpl<IllustrationItem, String> {
 
-  // RetryOptions specify how to automatically handle retries when Activities 
fail.
-  private static final RetryOptions ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(1))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(3)
-      .build();
-
-  private static final ActivityOptions ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofSeconds(10))
-      .setRetryOptions(ACTIVITY_RETRY_OPTS)
-      .build();
-
-  private final IllustrationItemActivity activityStub =
-      Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS);
-
   @Override
-  protected Promise<String> launchAsyncActivity(final IllustrationItem item) {
+  protected Promise<String> launchAsyncActivity(final IllustrationItem item, 
final Properties props) {
+    final IllustrationItemActivity activityStub =
+        
Workflow.newActivityStub(IllustrationItemActivity.class,ActivityType.DEFAULT_ACTIVITY.buildActivityOptions(props));
     return Async.function(activityStub::handleItem, item);
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java
new file mode 100644
index 0000000000..4b2f3ff50e
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+
+/** A wrapper class to be used as function param for {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(NestingExecWorkloadInput)}*/
+@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@AllArgsConstructor
+public class NestingExecWorkloadInput<WORK_ITEM> {
+  private WorkflowAddr addr;
+  private Workload<WORK_ITEM> workload;
+  private int startIndex;
+  private int maxBranchesPerTree;
+  private int maxSubTreesPerTree;
+  private Optional<Integer> maxSubTreesForCurrentTreeOverride;
+  private Properties props;
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 92ef6e1af9..1863724a1f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -35,6 +36,7 @@ import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
 import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 
@@ -46,14 +48,18 @@ public abstract class 
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
   public static final int 
MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT = 100;
 
   @Override
-  public int performWorkload(
-      final WorkflowAddr addr,
-      final Workload<WORK_ITEM> workload,
-      final int startIndex,
-      final int maxBranchesPerTree,
-      final int maxSubTreesPerTree,
-      final Optional<Integer> maxSubTreesForCurrentTreeOverride
-  ) {
+  public int performWorkload(NestingExecWorkloadInput<WORK_ITEM> 
performWorkloadInput) {
+    return performWorkloadInternal(performWorkloadInput);
+  }
+
+  private int performWorkloadInternal(NestingExecWorkloadInput<WORK_ITEM> 
performWorkloadInput) {
+    final WorkflowAddr addr = performWorkloadInput.getAddr();
+    final Workload<WORK_ITEM> workload = performWorkloadInput.getWorkload();
+    final int startIndex = performWorkloadInput.getStartIndex();
+    final int maxBranchesPerTree = 
performWorkloadInput.getMaxBranchesPerTree();
+    final int maxSubTreesPerTree = 
performWorkloadInput.getMaxSubTreesPerTree();
+    final Optional<Integer> maxSubTreesForCurrentTreeOverride = 
performWorkloadInput.getMaxSubTreesForCurrentTreeOverride();
+    final Properties props = performWorkloadInput.getProps();
     final int maxSubTreesForCurrent = 
maxSubTreesForCurrentTreeOverride.orElse(maxSubTreesPerTree);
     final int maxLeaves = maxBranchesPerTree - maxSubTreesForCurrent;
     final Optional<Workload.WorkSpan<WORK_ITEM>> optSpan = 
workload.getSpan(startIndex, maxLeaves);
@@ -65,7 +71,7 @@ public abstract class 
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
       final Workload.WorkSpan<WORK_ITEM> workSpan = optSpan.get();
       final Iterable<WORK_ITEM> iterable = () -> workSpan;
       final List<Promise<ACTIVITY_RESULT>> childActivities = 
StreamSupport.stream(iterable.spliterator(), false)
-          .map(t -> launchAsyncActivity(t))
+          .map(t -> launchAsyncActivity(t, props))
           .collect(Collectors.toList());
       final List<Promise<Integer>> childSubTrees = new ArrayList<>();
       if (workSpan.getNumElems() == maxLeaves) { // received as many as 
requested (did not stop short)
@@ -84,9 +90,11 @@ public abstract class 
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
             if (numDirectLeavesChildMayHave > 0) {
               
Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave));
             }
-            childSubTrees.add(
-                Async.function(child::performWorkload, childAddr, workload, 
childStartIndex, maxBranchesPerTree,
-                    maxSubTreesPerTree, 
Optional.of(subTreeChildMaxSubTreesPerTree)));
+            NestingExecWorkloadInput<WORK_ITEM> childInput = new 
NestingExecWorkloadInput<>(
+                childAddr, workload, childStartIndex, maxBranchesPerTree,
+                maxSubTreesPerTree, 
Optional.of(subTreeChildMaxSubTreesPerTree), props
+            );
+            childSubTrees.add(Async.function(child::performWorkload, 
childInput));
             ++subTreeId;
           }
         }
@@ -103,7 +111,7 @@ public abstract class 
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
   }
 
   /** Factory for invoking the specific activity by providing it args via 
{@link Async::function} */
-  protected abstract Promise<ACTIVITY_RESULT> launchAsyncActivity(WORK_ITEM 
task);
+  protected abstract Promise<ACTIVITY_RESULT> launchAsyncActivity(WORK_ITEM 
task, Properties props);
 
   protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final 
WorkflowAddr childAddr) {
     // preserve the current workflow ID of this parent, but add the 
(hierarchical) address extension specific to each child
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
index 3a6661d090..e0c3ebfa9d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
@@ -17,12 +17,10 @@
 
 package org.apache.gobblin.temporal.util.nesting.workflow;
 
-import java.util.Optional;
-
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
-import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
 import org.apache.gobblin.temporal.util.nesting.work.Workload;
 
 
@@ -44,12 +42,5 @@ import 
org.apache.gobblin.temporal.util.nesting.work.Workload;
 public interface NestingExecWorkflow<WORK_ITEM> {
   /** @return the number of workload elements processed cumulatively by this 
Workflow and its children */
   @WorkflowMethod
-  int performWorkload(
-      WorkflowAddr addr,
-      Workload<WORK_ITEM> workload,
-      int startIndex,
-      int maxBranchesPerTree,
-      int maxSubTreesPerTree,
-      Optional<Integer> maxSubTreesForCurrentTreeOverride
-  );
+  int performWorkload(NestingExecWorkloadInput<WORK_ITEM> 
performWorkloadInput);
 }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
new file mode 100644
index 0000000000..cdab706d86
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityOptions;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/** Tests for {@link ActivityType} */
+public class ActivityTypeTest {
+
+  private Properties props;
+  private final List<ActivityType> activityTypes = 
Arrays.asList(ActivityType.values());
+
+  @BeforeMethod
+  public void setUp() {
+    props = new Properties();
+  }
+
+  @Test
+  public void testDefaultValuesForTimeouts() {
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
+          
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+      Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
+          
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
+          
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
+          
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
+          
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
 0.01);
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(),
+          
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS);
+    });
+  }
+
+  @DataProvider(name = "activityTypesWithStartToCloseTimeout")
+  public Object[][] activityTypesWithStartToCloseTimeout() {
+    return new Object[][] {
+        {ActivityType.GENERATE_WORKUNITS, 333},
+        {ActivityType.RECOMMEND_SCALING, 111},
+        {ActivityType.DELETE_WORK_DIRS, 222},
+        {ActivityType.PROCESS_WORKUNIT, 555},
+        {ActivityType.COMMIT, 444},
+        {ActivityType.DEFAULT_ACTIVITY, 1}
+    };
+  }
+
+  @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+  public void testStartToCloseTimeout(ActivityType activityType, int 
expectedTimeout) {
+    props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), 
Integer.toString(expectedTimeout));
+    
Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(),
 Duration.ofMinutes(expectedTimeout));
+  }
+
+  @Test
+  public void testHeartBeatTimeout() {
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
 "14");
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      Assert.assertEquals(activityOptions.getHeartbeatTimeout(), 
Duration.ofMinutes(14));
+    });
+  }
+
+  @Test
+  public void testRetryOptions() {
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
 "115");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
 "5550");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
 "7.0");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
 "21");
+
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), 
Duration.ofSeconds(115));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), 
Duration.ofSeconds(5550));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 
7.0, 0.01);
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
+    });
+  }
+
+  @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+  public void testBuildActivityOptions(ActivityType activityType, int 
expectedTimeout) {
+    props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), 
Integer.toString(expectedTimeout));
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
 "144");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
 "115");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
 "5550");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
 "7.0");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
 "21");
+
+    ActivityOptions activityOptions = activityType.buildActivityOptions(props);
+
+    Assert.assertEquals(activityOptions.getStartToCloseTimeout(), 
Duration.ofMinutes(expectedTimeout));
+    Assert.assertEquals(activityOptions.getHeartbeatTimeout(), 
Duration.ofMinutes(144));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), 
Duration.ofSeconds(115));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), 
Duration.ofSeconds(5550));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 
7.0, 0.01);
+    
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
+  }
+
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 28f0daa93f..db5518f54d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -92,6 +92,14 @@ public class PropertiesUtils {
     return Long.parseLong(properties.getProperty(key, 
Long.toString(defaultValue)));
   }
 
+  public static double getPropAsDouble(Properties properties, String key, 
double defaultValue) {
+    try {
+      return Double.parseDouble(properties.getProperty(key, 
Double.toString(defaultValue)));
+    } catch (NullPointerException | NumberFormatException e) {
+      return defaultValue;
+    }
+  }
+
   /**
    * Get the value of a comma separated property as a {@link List} of strings.
    *
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index 42291f4cae..856067220d 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -101,4 +101,20 @@ public class PropertiesUtilsTest {
 
     Assert.assertEqualsNoOrder(PropertiesUtils.getValuesAsList(properties, 
Optional.of("k")).toArray(), new String[]{"v1", "v2", "v2"});
   }
+
+  @Test
+  public void testGetPropAsDouble() {
+    Properties properties = new Properties();
+    properties.put("k1", "1.0");
+    properties.put("k2", "1");
+    properties.put("k3", "1.00");
+    properties.put("k4", "");
+
+    Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "key", 
5.01), 5.01, 0.01);
+    Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k1", 
2.02), 1.00, 0.01);
+    Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k2", 
2.02), 1.00, 0.01);
+    Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k3", 
2.02), 1.00, 0.01);
+    Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k4", 
10.001), 10.001, 0.001);
+  }
+
 }

Reply via email to