[ 
https://issues.apache.org/jira/browse/GOBBLIN-2190?focusedWorklogId=958615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-958615
 ]

ASF GitHub Bot logged work on GOBBLIN-2190:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/25 05:51
            Start Date: 25/Feb/25 05:51
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4093:
URL: https://github.com/apache/gobblin/pull/4093#discussion_r1968820936


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java:
##########
@@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys {
   String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
   int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
   String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
+
+  /**
+   * Activities timeout configs
+   */
+  String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + 
"activity.heartbeat.timeout.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5;
+  String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + 
"activity.heartbeat.interval.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1;
+  String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 
"activity.starttoclose.timeout.minutes";
+  int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360;
+  String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100;
+  String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient";
+  int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2;

Review Comment:
   the `backoffCoefficient` is typically defined as a double.. `RetryOptions` 
class also takes input as double



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be 
performed.
+ */
+public enum ActivityType {
+  /** Activity type for generating work units. */
+  
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for recommending scaling operations. */
+  
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for deleting work directories. */
+  
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for processing a work unit. */
+  
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for committing step. */
+  
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Default placeholder activity type. */
+  
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+  @Getter private final String startToCloseTimeoutConfigKey;
+
+  ActivityType(String startToCloseTimeoutConfigKey) {
+    this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+  }
+
+  public ActivityOptions buildActivityOptions(Properties props) {
+    return ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(props))
+        .setHeartbeatTimeout(getHeartbeatTimeout(props))
+        .setRetryOptions(buildRetryOptions(props))
+        .build();
+  }
+
+  private Duration getStartToCloseTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, 
this.startToCloseTimeoutConfigKey,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+  }
+
+  private Duration getHeartbeatTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+        
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+  }
+
+  private RetryOptions buildRetryOptions(Properties props) {
+    return RetryOptions.newBuilder()
+        
.setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS)))
+        
.setMaximumInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,
+            
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS)))
+        .setBackoffCoefficient(PropertiesUtils.getPropAsInt(props,

Review Comment:
   use `getPropAsDouble` instead of `getPropAsInt` as per above change of using 
double



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityOptions;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/** Tests for {@link ActivityType} */
+public class ActivityTypeTest {
+
+  private Properties props;
+  private final List<ActivityType> activityTypes = 
Arrays.asList(ActivityType.values());
+
+  @BeforeMethod
+  public void setUp() {
+    props = new Properties();
+  }
+
+  @Test
+  public void testDefaultValuesForTimeouts() {
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
+          
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+      Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
+          
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
+          
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
+          
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
+          (double) 
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT);
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(),
+          
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS);
+    });
+  }
+
+  @DataProvider(name = "activityTypesWithStartToCloseTimeout")
+  public Object[][] activityTypesWithStartToCloseTimeout() {
+    return new Object[][] {
+        {ActivityType.GENERATE_WORKUNITS, 333},
+        {ActivityType.RECOMMEND_SCALING, 111},
+        {ActivityType.DELETE_WORK_DIRS, 222},
+        {ActivityType.PROCESS_WORKUNIT, 555},
+        {ActivityType.COMMIT, 444},
+        {ActivityType.DEFAULT_ACTIVITY, 1}
+    };
+  }
+
+  @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+  public void testStartToCloseTimeout(ActivityType activityType, int 
expectedTimeout) {
+    props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), 
Integer.toString(expectedTimeout));
+    
Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(),
 Duration.ofMinutes(expectedTimeout));
+  }
+
+  @Test
+  public void testHeartBeatTimeout() {
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
 "14");
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      Assert.assertEquals(activityOptions.getHeartbeatTimeout(), 
Duration.ofMinutes(14));
+    });
+  }
+
+  @Test
+  public void testRetryOptions() {
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
 "115");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
 "5550");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
 "7");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
 "21");
+
+    activityTypes.stream().map(activityType -> 
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+      
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), 
Duration.ofSeconds(115));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), 
Duration.ofSeconds(5550));
+      
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 
7.0);
+      
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
+    });
+  }
+
+  @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+  public void testBuildActivityOptions(ActivityType activityType, int 
expectedTimeout) {
+    props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), 
Integer.toString(expectedTimeout));
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
 "144");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
 "115");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
 "5550");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
 "7");
+    
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
 "21");
+
+    ActivityOptions activityOptions = activityType.buildActivityOptions(props);
+
+    Assert.assertEquals(activityOptions.getStartToCloseTimeout(), 
Duration.ofMinutes(expectedTimeout));
+    Assert.assertEquals(activityOptions.getHeartbeatTimeout(), 
Duration.ofMinutes(144));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), 
Duration.ofSeconds(115));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), 
Duration.ofSeconds(5550));
+    
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 
7.0);

Review Comment:
   since double values have chances of rounding issues, include a delta also in 
the assertion `(..., 7.0, .001)`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -81,58 +81,20 @@
 public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
   public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits";
 
-  public static final Duration genWUsStartToCloseTimeout = 
Duration.ofHours(2); // TODO: make configurable... also add activity heartbeats
-
-  private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(genWUsStartToCloseTimeout)
-      .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
-      .build();
-
-  private final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS);
-
-  private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofMinutes(5))
-      .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS)
-      .build();
-  private final RecommendScalingForWorkUnits recommendScalingStub = 
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
-      RECOMMEND_SCALING_ACTIVITY_OPTS);
-
-  private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = 
RetryOptions.newBuilder()
-      .setInitialInterval(Duration.ofSeconds(3))
-      .setMaximumInterval(Duration.ofSeconds(100))
-      .setBackoffCoefficient(2)
-      .setMaximumAttempts(4)
-      .build();
-
-  private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofMinutes(10))
-      .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
-      .build();
-  private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = 
Workflow.newActivityStub(DeleteWorkDirsActivity.class, 
DELETE_WORK_DIRS_ACTIVITY_OPTS);
-
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
     Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = 
Optional.empty();
     WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
+    // Filtering only temporal job properties to pass to child workflows to 
avoid passing unnecessary properties
+    final Properties temporalJobProps = 
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+        
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));

Review Comment:
   import `com.google.common.base.Optional`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be 
performed.
+ */
+public enum ActivityType {
+  /** Activity type for generating work units. */
+  
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for recommending scaling operations. */
+  
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for deleting work directories. */
+  
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for processing a work unit. */
+  
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Activity type for committing step. */
+  
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+  /** Default placeholder activity type. */
+  
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+  @Getter private final String startToCloseTimeoutConfigKey;
+
+  ActivityType(String startToCloseTimeoutConfigKey) {
+    this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+  }
+
+  public ActivityOptions buildActivityOptions(Properties props) {
+    return ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(props))
+        .setHeartbeatTimeout(getHeartbeatTimeout(props))
+        .setRetryOptions(buildRetryOptions(props))
+        .build();
+  }
+
+  private Duration getStartToCloseTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, 
this.startToCloseTimeoutConfigKey,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+  }
+
+  private Duration getHeartbeatTimeout(Properties props) {
+    return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+        
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+  }
+
+  private RetryOptions buildRetryOptions(Properties props) {
+    return RetryOptions.newBuilder()
+        
.setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,

Review Comment:
   we can check that the initial interval is not greater than max interval



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java:
##########
@@ -74,7 +74,7 @@ public interface GobblinTemporalConfigurationKeys {
 
   String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + 
"polling.interval.seconds";
   int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
-
+  

Review Comment:
   nit: remove space





Issue Time Tracking
-------------------

    Worklog Id:     (was: 958615)
    Time Spent: 2h 40m  (was: 2.5h)

> Implement ActivityTimeoutStrategy for all Temporal Activities
> -------------------------------------------------------------
>
>                 Key: GOBBLIN-2190
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2190
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Currently TImeouts of all Temporal Activity are hardcoded and cant change 
> during runtime, change those to make them configurable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to