khandelwal-prateek commented on code in PR #4093:
URL: https://github.com/apache/gobblin/pull/4093#discussion_r1968820936


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java:
##########
@@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys {
   String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
   int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
   String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
+
+  /**
+   * Activities timeout configs
+   */
+  String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + 
"activity.heartbeat.timeout.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5;
+  String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + 
"activity.heartbeat.interval.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1;
+  String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 
"activity.starttoclose.timeout.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360;
+  String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2;

Review Comment:
   the `backoffCoefficient` is typically defined as a double.. `RetryOptions` 
class also takes input as double



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be 
performed.
+ */
+public enum ActivityType {
+  /** Activity type for generating work units. */
+  
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for recommending scaling operations. */
+  
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for deleting work directories. */
+  
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for processing a work unit. */
+  
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for committing step. */
+  
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Default placeholder activity type. */
+  
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+  @Getter private final String startToCloseTimeoutConfigKey;
+
+  ActivityType(String startToCloseTimeoutConfigKey) {
+    this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+  }
+
+  public ActivityOptions buildActivityOptions(Properties props) {
+    return ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(props))
+        .setHeartbeatTimeout(getHeartbeatTimeout(props))
+        .setRetryOptions(buildRetryOptions(props))
+        .build();
+  }
+
+  private Duration getStartToCloseTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, 
this.startToCloseTimeoutConfigKey,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+  }
+
+  private Duration getHeartbeatTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+        
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+  }
+
+  private RetryOptions buildRetryOptions(Properties props) {
+    return RetryOptions.newBuilder()
+        
.setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS)))
+        
.setMaximumInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS)))
+        .setBackoffCoefficient(PropertiesUtils.getPropAsInt(props,

Review Comment:
   use `getPropAsDouble` instead of `getPropAsInt` as per above change of using 
double



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityOptions;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/** Tests for {@link ActivityType} */
+public class ActivityTypeTest {
+
+  private Properties props;
+  private final List<ActivityType> activityTypes = 
Arrays.asList(ActivityType.values());
+
+  @BeforeMethod
+  public void setUp() {
+    props = new Properties();
+  }
+
+  @Test
+  public void testDefaultValuesForTimeouts() {
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
+          
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+      Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
+          
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
+          
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
+          
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
+          (double) 
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT);
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(),
+          
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS);
+    });
+  }
+
+  @DataProvider(name = "activityTypesWithStartToCloseTimeout")
+  public Object[][] activityTypesWithStartToCloseTimeout() {
+    return new Object[][] {
+        {ActivityType.GENERATE_WORKUNITS, 333},
+        {ActivityType.RECOMMEND_SCALING, 111},
+        {ActivityType.DELETE_WORK_DIRS, 222},
+        {ActivityType.PROCESS_WORKUNIT, 555},
+        {ActivityType.COMMIT, 444},
+        {ActivityType.DEFAULT_ACTIVITY, 1}
+    };
+  }
+
+  @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+  public void testStartToCloseTimeout(ActivityType activityType, int 
expectedTimeout) {
+    props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), 
Integer.toString(expectedTimeout));
+    
Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(),
 Duration.ofMinutes(expectedTimeout));
+  }
+
+  @Test
+  public void testHeartBeatTimeout() {
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
 "14");
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      Assert.assertEquals(activityOptions.getHeartbeatTimeout(), 
Duration.ofMinutes(14));
+    });
+  }
+
+  @Test
+  public void testRetryOptions() {
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
 "115");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
 "5550");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
 "7");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
 "21");
+
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), 
Duration.ofSeconds(115));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), 
Duration.ofSeconds(5550));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 
7.0);
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
+    });
+  }
+
+  @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+  public void testBuildActivityOptions(ActivityType activityType, int 
expectedTimeout) {
+    props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), 
Integer.toString(expectedTimeout));
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
 "144");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
 "115");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
 "5550");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
 "7");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
 "21");
+
+    ActivityOptions activityOptions = activityType.buildActivityOptions(props);
+
+    Assert.assertEquals(activityOptions.getStartToCloseTimeout(), 
Duration.ofMinutes(expectedTimeout));
+    Assert.assertEquals(activityOptions.getHeartbeatTimeout(), 
Duration.ofMinutes(144));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), 
Duration.ofSeconds(115));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), 
Duration.ofSeconds(5550));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 
7.0);

Review Comment:
   since double values have chances of rounding issues, include a delta also in 
the assertion `(..., 7.0, .001)`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -81,58 +81,20 @@
 public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
   public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits";
 
-  public static final Duration genWUsStartToCloseTimeout = 
Duration.ofHours(2); // TODO: make configurable... also add activity heartbeats
-
-  private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(genWUsStartToCloseTimeout)
-      .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
-      .build();
-
-  private final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS);
-
-  private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofMinutes(5))
-      .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS)
-      .build();
-  private final RecommendScalingForWorkUnits recommendScalingStub = 
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
-      RECOMMEND_SCALING_ACTIVITY_OPTS);
-
-  private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofMinutes(10))
-      .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
-      .build();
-  private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = 
Workflow.newActivityStub(DeleteWorkDirsActivity.class, 
DELETE_WORK_DIRS_ACTIVITY_OPTS);
-
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
     Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = 
Optional.empty();
     WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
+    // Filtering only temporal job properties to pass to child workflows to 
avoid passing unnecessary properties
+    final Properties temporalJobProps = 
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+        
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));

Review Comment:
   import `com.google.common.base.Optional`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be 
performed.
+ */
+public enum ActivityType {
+  /** Activity type for generating work units. */
+  
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for recommending scaling operations. */
+  
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for deleting work directories. */
+  
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for processing a work unit. */
+  
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for committing step. */
+  
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Default placeholder activity type. */
+  
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+  @Getter private final String startToCloseTimeoutConfigKey;
+
+  ActivityType(String startToCloseTimeoutConfigKey) {
+    this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+  }
+
+  public ActivityOptions buildActivityOptions(Properties props) {
+    return ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(props))
+        .setHeartbeatTimeout(getHeartbeatTimeout(props))
+        .setRetryOptions(buildRetryOptions(props))
+        .build();
+  }
+
+  private Duration getStartToCloseTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, 
this.startToCloseTimeoutConfigKey,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+  }
+
+  private Duration getHeartbeatTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+        
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+  }
+
+  private RetryOptions buildRetryOptions(Properties props) {
+    return RetryOptions.newBuilder()
+        
.setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,

Review Comment:
   we can check that the initial interval is not greater than max interval



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java:
##########
@@ -74,7 +74,7 @@ public interface GobblinTemporalConfigurationKeys {
 
   String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + 
"polling.interval.seconds";
   int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
-
+  

Review Comment:
   nit: remove space



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to