[ https://issues.apache.org/jira/browse/GOBBLIN-2190?focusedWorklogId=955091&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955091 ]
ASF GitHub Bot logged work on GOBBLIN-2190: ------------------------------------------- Author: ASF GitHub Bot Created on: 03/Feb/25 04:37 Start Date: 03/Feb/25 04:37 Worklog Time Spent: 10m Work Description: vsinghal85 commented on code in PR #4093: URL: https://github.com/apache/gobblin/pull/4093#discussion_r1938734553 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java: ########## @@ -17,39 +17,26 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.util.TemporalActivityUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; /** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> { - public static final Duration processWorkUnitStartToCloseTimeout = Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats - - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); @Override - protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) { - return Async.function(activityStub::processWorkUnit, wu); + protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) { + final ProcessWorkUnit processWorkUnitStub = Workflow.newActivityStub(ProcessWorkUnit.class, TemporalActivityUtils.buildActivityOptions( + ActivityType.PROCESS_WORKUNIT, props)); Review Comment: Add logging just before to state, we are about to invoke async activity ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java: ########## @@ -17,39 +17,26 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.util.TemporalActivityUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; /** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> { - public static final Duration processWorkUnitStartToCloseTimeout = Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats - - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); @Override - protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) { - return Async.function(activityStub::processWorkUnit, wu); + protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) { + final ProcessWorkUnit processWorkUnitStub = Workflow.newActivityStub(ProcessWorkUnit.class, TemporalActivityUtils.buildActivityOptions( + ActivityType.PROCESS_WORKUNIT, props)); + return Async.function(processWorkUnitStub::processWorkUnit, wu); Review Comment: Once this Async function execution concludes, add appropriate logs in case of success or exception message in case of failure. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Properties; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * Interface for defining timeout strategies for different Temporal activities. Review Comment: [NIT] timeout strategies -> "activity configuration strategies" ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.util; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import com.google.common.annotations.VisibleForTesting; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import lombok.experimental.UtilityClass; + +import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; + + +/** Utility class for handling Temporal Activity related operations. */ +@UtilityClass +public class TemporalActivityUtils { + + @VisibleForTesting + protected static final RetryOptions DEFAULT_RETRY_OPTIONS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final Map<ActivityType, ActivityConfigurationStrategy> activityConfigurationStrategies = new HashMap<>(); + + static { + activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.COMMIT, new ActivityConfigurationStrategy.CommitActivityConfigurationStrategy()); + } + + /** + * Builds and returns an {@link ActivityOptions} object configured with the specified {@link ActivityType} and properties. + * + * @param activityType the type of the activity for which the options are being built. + * @param props the properties to be used for configuring the activity options. + * @return an {@link ActivityOptions} object configured with the specified activity type and properties. + */ + public static ActivityOptions buildActivityOptions(ActivityType activityType, Properties props) { + return ActivityOptions.newBuilder() + .setStartToCloseTimeout(getStartToCloseTimeout(activityType, props)) + .setRetryOptions(buildRetryOptions(activityType, props)) + .build(); + } + + /** + * Retrieves the start to close timeout duration for a given {@link ActivityType} based on the provided properties. + * + * @param activityType the type of the activity for which the start to close timeout is being retrieved. + * @param props the properties to be used for configuring the timeout. + * @return the start to close timeout duration for the specified activity type. + */ + private static Duration getStartToCloseTimeout(ActivityType activityType, Properties props) { + ActivityConfigurationStrategy activityConfigurationStrategy = activityConfigurationStrategies.get(activityType); + if (activityConfigurationStrategy == null) { + return ActivityConfigurationStrategy.defaultStartToCloseTimeout; Review Comment: Let's add log to track this scenario ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java: ########## @@ -42,22 +42,10 @@ @Slf4j public class CommitStepWorkflowImpl implements CommitStepWorkflow { - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofHours(3)) // TODO: make configurable... also add activity heartbeats - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS); - @Override - public CommitStats commit(WUProcessingSpec workSpec) { + public CommitStats commit(WUProcessingSpec workSpec, final Properties props) { + final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, TemporalActivityUtils.buildActivityOptions( Review Comment: Would suggest to have this variable inside the class as instance variable as this can be used for CommitStepWorkflow across different methods of this class. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -54,37 +54,30 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow"; @Override - public CommitStats process(WUProcessingSpec workSpec) { + public CommitStats process(WUProcessingSpec workSpec, final Properties props) { Review Comment: Can we add unit tests for this class? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Properties; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * Interface for defining timeout strategies for different Temporal activities. + * Each strategy provides a method to retrieve the timeout duration based on the provided properties. Review Comment: change as per scope of new class to something like -> "Each strategy provides a method to retrieve configuration details, such as timeout duration, based on the provided properties." ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java: ########## @@ -84,9 +90,11 @@ public int performWorkload( if (numDirectLeavesChildMayHave > 0) { Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave)); } - childSubTrees.add( - Async.function(child::performWorkload, childAddr, workload, childStartIndex, maxBranchesPerTree, - maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree))); + NestingExecWorkloadInput<WORK_ITEM> childInput = new NestingExecWorkloadInput<>( Review Comment: Can we add unit tests for this? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java: ########## @@ -32,24 +34,11 @@ public class NestingExecOfIllustrationItemActivityWorkflowImpl extends AbstractNestingExecWorkflowImpl<IllustrationItem, String> { - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(1)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(3) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofSeconds(10)) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final IllustrationItemActivity activityStub = - Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS); - @Override - protected Promise<String> launchAsyncActivity(final IllustrationItem item) { + protected Promise<String> launchAsyncActivity(final IllustrationItem item, final Properties props) { + final IllustrationItemActivity activityStub = + Workflow.newActivityStub(IllustrationItemActivity.class, + TemporalActivityUtils.buildActivityOptions(ActivityType.DEFAULT_ACTIVITY, props)); Review Comment: Any particular reason why we are using default activity here? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java: ########## @@ -17,39 +17,26 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.util.TemporalActivityUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; /** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> { - public static final Duration processWorkUnitStartToCloseTimeout = Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats - - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); @Override - protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) { - return Async.function(activityStub::processWorkUnit, wu); + protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) { Review Comment: Do we plan to add unit tests for this? Issue Time Tracking ------------------- Worklog Id: (was: 955091) Remaining Estimate: 0h Time Spent: 10m > Implement ActivityTimeoutStrategy for all Temporal Activities > ------------------------------------------------------------- > > Key: GOBBLIN-2190 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2190 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Vivek Rai > Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Currently TImeouts of all Temporal Activity are hardcoded and cant change > during runtime, change those to make them configurable. -- This message was sent by Atlassian Jira (v8.20.10#820010)