[ https://issues.apache.org/jira/browse/GOBBLIN-2190?focusedWorklogId=958615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-958615 ]
ASF GitHub Bot logged work on GOBBLIN-2190: ------------------------------------------- Author: ASF GitHub Bot Created on: 25/Feb/25 05:51 Start Date: 25/Feb/25 05:51 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4093: URL: https://github.com/apache/gobblin/pull/4093#discussion_r1968820936 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java: ########## @@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys { String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds"; int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10; String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions"; + + /** + * Activities timeout configs + */ + String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + "activity.heartbeat.timeout.minutes"; + int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5; + String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + "activity.heartbeat.interval.minutes"; + int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1; + String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = "activity.starttoclose.timeout.minutes"; + int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360; + String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options"; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds"; + int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds"; + int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient"; + int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2; Review Comment: the `backoffCoefficient` is typically defined as a double.. `RetryOptions` class also takes input as double ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Properties; + +import lombok.Getter; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * Enum representing different types of activities in the Temporal workflow. + * Each activity type corresponds to a specific operation that can be performed. + */ +public enum ActivityType { + /** Activity type for generating work units. */ + GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for recommending scaling operations. */ + RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for deleting work directories. */ + DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for processing a work unit. */ + PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for committing step. */ + COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Default placeholder activity type. */ + DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES); + + @Getter private final String startToCloseTimeoutConfigKey; + + ActivityType(String startToCloseTimeoutConfigKey) { + this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey; + } + + public ActivityOptions buildActivityOptions(Properties props) { + return ActivityOptions.newBuilder() + .setStartToCloseTimeout(getStartToCloseTimeout(props)) + .setHeartbeatTimeout(getHeartbeatTimeout(props)) + .setRetryOptions(buildRetryOptions(props)) + .build(); + } + + private Duration getStartToCloseTimeout(Properties props) { + return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, this.startToCloseTimeoutConfigKey, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES)); + } + + private Duration getHeartbeatTimeout(Properties props) { + return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES)); + } + + private RetryOptions buildRetryOptions(Properties props) { + return RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS))) + .setMaximumInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS))) + .setBackoffCoefficient(PropertiesUtils.getPropAsInt(props, Review Comment: use `getPropAsDouble` instead of `getPropAsInt` as per above change of using double ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import io.temporal.activity.ActivityOptions; + +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + + +/** Tests for {@link ActivityType} */ +public class ActivityTypeTest { + + private Properties props; + private final List<ActivityType> activityTypes = Arrays.asList(ActivityType.values()); + + @BeforeMethod + public void setUp() { + props = new Properties(); + } + + @Test + public void testDefaultValuesForTimeouts() { + activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> { + Assert.assertEquals(activityOptions.getStartToCloseTimeout(), + Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES)); + Assert.assertEquals(activityOptions.getHeartbeatTimeout(), + Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES)); + Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), + Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS)); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), + Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS)); + Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), + (double) GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS); + }); + } + + @DataProvider(name = "activityTypesWithStartToCloseTimeout") + public Object[][] activityTypesWithStartToCloseTimeout() { + return new Object[][] { + {ActivityType.GENERATE_WORKUNITS, 333}, + {ActivityType.RECOMMEND_SCALING, 111}, + {ActivityType.DELETE_WORK_DIRS, 222}, + {ActivityType.PROCESS_WORKUNIT, 555}, + {ActivityType.COMMIT, 444}, + {ActivityType.DEFAULT_ACTIVITY, 1} + }; + } + + @Test(dataProvider = "activityTypesWithStartToCloseTimeout") + public void testStartToCloseTimeout(ActivityType activityType, int expectedTimeout) { + props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), Integer.toString(expectedTimeout)); + Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(), Duration.ofMinutes(expectedTimeout)); + } + + @Test + public void testHeartBeatTimeout() { + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, "14"); + activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> { + Assert.assertEquals(activityOptions.getHeartbeatTimeout(), Duration.ofMinutes(14)); + }); + } + + @Test + public void testRetryOptions() { + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS, "115"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS, "5550"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, "7"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, "21"); + + activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> { + Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), Duration.ofSeconds(115)); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), Duration.ofSeconds(5550)); + Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 7.0); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21); + }); + } + + @Test(dataProvider = "activityTypesWithStartToCloseTimeout") + public void testBuildActivityOptions(ActivityType activityType, int expectedTimeout) { + props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), Integer.toString(expectedTimeout)); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, "144"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS, "115"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS, "5550"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, "7"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, "21"); + + ActivityOptions activityOptions = activityType.buildActivityOptions(props); + + Assert.assertEquals(activityOptions.getStartToCloseTimeout(), Duration.ofMinutes(expectedTimeout)); + Assert.assertEquals(activityOptions.getHeartbeatTimeout(), Duration.ofMinutes(144)); + Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), Duration.ofSeconds(115)); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), Duration.ofSeconds(5550)); + Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 7.0); Review Comment: since double values have chances of rounding issues, include a delta also in the assertion `(..., 7.0, .001)` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java: ########## @@ -81,58 +81,20 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow { public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits"; - public static final Duration genWUsStartToCloseTimeout = Duration.ofHours(2); // TODO: make configurable... also add activity heartbeats - - private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(genWUsStartToCloseTimeout) - .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS) - .build(); - - private final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS); - - private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofMinutes(5)) - .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS) - .build(); - private final RecommendScalingForWorkUnits recommendScalingStub = Workflow.newActivityStub(RecommendScalingForWorkUnits.class, - RECOMMEND_SCALING_ACTIVITY_OPTS); - - private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofMinutes(10)) - .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS) - .build(); - private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub(DeleteWorkDirsActivity.class, DELETE_WORK_DIRS_ACTIVITY_OPTS); - @Override public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) { TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME` EventTimer jobSuccessTimer = timerFactory.createJobTimer(); Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = Optional.empty(); WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext); + // Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties + final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps, + com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX)); Review Comment: import `com.google.common.base.Optional` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Properties; + +import lombok.Getter; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * Enum representing different types of activities in the Temporal workflow. + * Each activity type corresponds to a specific operation that can be performed. + */ +public enum ActivityType { + /** Activity type for generating work units. */ + GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for recommending scaling operations. */ + RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for deleting work directories. */ + DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for processing a work unit. */ + PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for committing step. */ + COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Default placeholder activity type. */ + DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES); + + @Getter private final String startToCloseTimeoutConfigKey; + + ActivityType(String startToCloseTimeoutConfigKey) { + this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey; + } + + public ActivityOptions buildActivityOptions(Properties props) { + return ActivityOptions.newBuilder() + .setStartToCloseTimeout(getStartToCloseTimeout(props)) + .setHeartbeatTimeout(getHeartbeatTimeout(props)) + .setRetryOptions(buildRetryOptions(props)) + .build(); + } + + private Duration getStartToCloseTimeout(Properties props) { + return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, this.startToCloseTimeoutConfigKey, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES)); + } + + private Duration getHeartbeatTimeout(Properties props) { + return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES)); + } + + private RetryOptions buildRetryOptions(Properties props) { + return RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props, Review Comment: we can check that the initial interval is not greater than max interval ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java: ########## @@ -74,7 +74,7 @@ public interface GobblinTemporalConfigurationKeys { String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + "polling.interval.seconds"; int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; - + Review Comment: nit: remove space Issue Time Tracking ------------------- Worklog Id: (was: 958615) Time Spent: 2h 40m (was: 2.5h) > Implement ActivityTimeoutStrategy for all Temporal Activities > ------------------------------------------------------------- > > Key: GOBBLIN-2190 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2190 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Vivek Rai > Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Currently TImeouts of all Temporal Activity are hardcoded and cant change > during runtime, change those to make them configurable. -- This message was sent by Atlassian Jira (v8.20.10#820010)