khandelwal-prateek commented on code in PR #4093:
URL: https://github.com/apache/gobblin/pull/4093#discussion_r1946094701


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different 
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as 
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+  /** Default start to close timeout duration for any activity if not 
specified. */
+  Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
+  int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120;

Review Comment:
   do we have numbers currently, how long can it take for each activity, 
ideally we want this number to be higher than 100th %ile of all executions that 
are running and also having scope for what we support(max data copy size - 
something that we will have as part of our SLAs for users)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different 
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as 
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+  /** Default start to close timeout duration for any activity if not 
specified. */
+  Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
+  int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120;
+  int DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 5;
+  int DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 10;
+  int DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
+  int DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;

Review Comment:
   let's update the default numbers to a higher value for now till we have 
better visibility and tuning in place. Also, we already see flows which are 
running close to these number and timing out those isn't correct eg. 
GENERATE_WORKUNITS=4h, PROCESS_WORKUNIT=6h, COMMIT=4h
   
   do we know how much max time max takes/can take and what is the factor that 
increases the time, if no, let's also increase DELETE_WORK_DIRS/ 
RECOMMEND_SCALING time



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be 
performed.
+ */
+public enum ActivityType {

Review Comment:
   a separate class is not required, this class can be merged with 
ActivityConfigurationStrategy as suggested above  by using enum in place of 
explicit strategy classes



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+  @VisibleForTesting
+  protected static final RetryOptions DEFAULT_RETRY_OPTIONS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final Map<ActivityType, ActivityConfigurationStrategy> 
activityConfigurationStrategies = new HashMap<>();
+
+  static {
+    activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new 
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new 
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new 
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new 
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.COMMIT, new 
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+  }
+
+  /**
+   * Builds and returns an {@link ActivityOptions} object configured with the 
specified {@link ActivityType} and properties.
+   *
+   * @param activityType the type of the activity for which the options are 
being built.
+   * @param props the properties to be used for configuring the activity 
options.
+   * @return an {@link ActivityOptions} object configured with the specified 
activity type and properties.
+   */
+  public static ActivityOptions buildActivityOptions(ActivityType 
activityType, Properties props) {
+    return ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(activityType, props))
+        .setRetryOptions(buildRetryOptions(activityType, props))
+        .build();
+  }
+
+  /**
+   * Retrieves the start to close timeout duration for a given {@link 
ActivityType} based on the provided properties.
+   *
+   * @param activityType the type of the activity for which the start to close 
timeout is being retrieved.
+   * @param props the properties to be used for configuring the timeout.
+   * @return the start to close timeout duration for the specified activity 
type.
+   */
+  private static Duration getStartToCloseTimeout(ActivityType activityType, 
Properties props) {
+    ActivityConfigurationStrategy activityConfigurationStrategy = 
activityConfigurationStrategies.get(activityType);
+    if (activityConfigurationStrategy == null) {
+      log.warn("No configuration strategy found for activity type {}. Using 
default start to close timeout.", activityType);
+      return ActivityConfigurationStrategy.defaultStartToCloseTimeout;
+    }
+    return activityConfigurationStrategy.getStartToCloseTimeout(props);
+  }
+
+  /**
+   * Builds and returns an {@link RetryOptions} object configured with the 
specified {@link ActivityType} and properties.
+   *
+   * @param activityType the type of the activity for which the options are 
being built.
+   * @param props the properties to be used for configuring the activity 
options.
+   * @return an {@link RetryOptions} object configured with the specified 
activity type and properties.
+   */
+  private static RetryOptions buildRetryOptions(ActivityType activityType, 
Properties props) {

Review Comment:
   this logic should be moved to `ActivityType` enum, where it has 
`getStartToCloseTimeout` & `getRetryOptions` methods



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+  @VisibleForTesting
+  protected static final RetryOptions DEFAULT_RETRY_OPTIONS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final Map<ActivityType, ActivityConfigurationStrategy> 
activityConfigurationStrategies = new HashMap<>();
+
+  static {
+    activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new 
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new 
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new 
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new 
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.COMMIT, new 
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+  }

Review Comment:
   this map is unnecessary because ActivityType can uniquely identify the 
activity and method `getStartToCloseTimeout`/`getRetryOptions` can be directly 
called for each activity



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different 
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as 
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+  /** Default start to close timeout duration for any activity if not 
specified. */
+  Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
+  int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120;
+  int DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 5;
+  int DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 10;
+  int DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
+  int DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
+
+  /**
+   * Retrieves the start to close timeout duration for an activity based on 
the provided properties.
+   *
+   * @param props the properties to be used for configuring the timeout.
+   * @return the timeout duration for the activity.
+   */
+  Duration getStartToCloseTimeout(Properties props);
+
+  /**
+   * Configuration strategy for the Generate Workunits activity.
+   */
+  class GenerateWorkunitsActivityConfigurationStrategy implements 
ActivityConfigurationStrategy {
+    @Override
+    public Duration getStartToCloseTimeout(Properties props) {
+      return Duration.ofMinutes(PropertiesUtils.getPropAsInt(
+          props,
+          
GobblinTemporalConfigurationKeys.GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES,
+          DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES
+      ));
+    }

Review Comment:
   the same logic is repeated in every strategy class. Instead of creating 
multiple classes, please use an enum based approach where each enum has two 
fields `key` & `defaultTimeoutMinutes` and it can encapsulate the 
`getStartToCloseTimeout` method
   
   ```
   public enum ActivityType {
   ...
       ActivityType(String timeoutConfigKey, int defaultTimeoutMinutes) {
           this.timeoutConfigKey = timeoutConfigKey;
           this.defaultTimeoutMinutes = defaultTimeoutMinutes;
       }
   
       public Duration getStartToCloseTimeout(Properties props) {
           return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, 
timeoutConfigKey, defaultTimeoutMinutes));
       }
   ...
   }
   
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different 
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as 
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+  /** Default start to close timeout duration for any activity if not 
specified. */
+  Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);

Review Comment:
   this should be static final



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+  @VisibleForTesting
+  protected static final RetryOptions DEFAULT_RETRY_OPTIONS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final Map<ActivityType, ActivityConfigurationStrategy> 
activityConfigurationStrategies = new HashMap<>();
+
+  static {
+    activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new 
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new 
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new 
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new 
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.COMMIT, new 
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+  }
+
+  /**
+   * Builds and returns an {@link ActivityOptions} object configured with the 
specified {@link ActivityType} and properties.
+   *
+   * @param activityType the type of the activity for which the options are 
being built.
+   * @param props the properties to be used for configuring the activity 
options.
+   * @return an {@link ActivityOptions} object configured with the specified 
activity type and properties.
+   */
+  public static ActivityOptions buildActivityOptions(ActivityType 
activityType, Properties props) {
+    return ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(getStartToCloseTimeout(activityType, props))
+        .setRetryOptions(buildRetryOptions(activityType, props))
+        .build();
+  }
+
+  /**
+   * Retrieves the start to close timeout duration for a given {@link 
ActivityType} based on the provided properties.
+   *
+   * @param activityType the type of the activity for which the start to close 
timeout is being retrieved.
+   * @param props the properties to be used for configuring the timeout.
+   * @return the start to close timeout duration for the specified activity 
type.
+   */
+  private static Duration getStartToCloseTimeout(ActivityType activityType, 
Properties props) {
+    ActivityConfigurationStrategy activityConfigurationStrategy = 
activityConfigurationStrategies.get(activityType);
+    if (activityConfigurationStrategy == null) {
+      log.warn("No configuration strategy found for activity type {}. Using 
default start to close timeout.", activityType);
+      return ActivityConfigurationStrategy.defaultStartToCloseTimeout;
+    }
+    return activityConfigurationStrategy.getStartToCloseTimeout(props);
+  }
+
+  /**
+   * Builds and returns an {@link RetryOptions} object configured with the 
specified {@link ActivityType} and properties.
+   *
+   * @param activityType the type of the activity for which the options are 
being built.
+   * @param props the properties to be used for configuring the activity 
options.
+   * @return an {@link RetryOptions} object configured with the specified 
activity type and properties.
+   */
+  private static RetryOptions buildRetryOptions(ActivityType activityType, 
Properties props) {
+    // Currently returning just the default retry options for each activity 
type
+    return DEFAULT_RETRY_OPTIONS;

Review Comment:
   retry settings are currently hardcoded which would requires a code change. 
Please use PropertiesUtils to fetch each parameter with default values



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+  @VisibleForTesting
+  protected static final RetryOptions DEFAULT_RETRY_OPTIONS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final Map<ActivityType, ActivityConfigurationStrategy> 
activityConfigurationStrategies = new HashMap<>();
+
+  static {
+    activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new 
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new 
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new 
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new 
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+    activityConfigurationStrategies.put(ActivityType.COMMIT, new 
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+  }
+
+  /**
+   * Builds and returns an {@link ActivityOptions} object configured with the 
specified {@link ActivityType} and properties.
+   *
+   * @param activityType the type of the activity for which the options are 
being built.
+   * @param props the properties to be used for configuring the activity 
options.
+   * @return an {@link ActivityOptions} object configured with the specified 
activity type and properties.
+   */
+  public static ActivityOptions buildActivityOptions(ActivityType 
activityType, Properties props) {

Review Comment:
   this is the only method required in this class and the method should 
leverage ActivityType enum to fetch timeout/retry info for the activity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to