vsinghal85 commented on code in PR #4093: URL: https://github.com/apache/gobblin/pull/4093#discussion_r1938734553
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java: ########## @@ -17,39 +17,26 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.util.TemporalActivityUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; /** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> { - public static final Duration processWorkUnitStartToCloseTimeout = Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats - - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); @Override - protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) { - return Async.function(activityStub::processWorkUnit, wu); + protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) { + final ProcessWorkUnit processWorkUnitStub = Workflow.newActivityStub(ProcessWorkUnit.class, TemporalActivityUtils.buildActivityOptions( + ActivityType.PROCESS_WORKUNIT, props)); Review Comment: Add logging just before to state, we are about to invoke async activity ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java: ########## @@ -17,39 +17,26 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.util.TemporalActivityUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; /** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> { - public static final Duration processWorkUnitStartToCloseTimeout = Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats - - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); @Override - protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) { - return Async.function(activityStub::processWorkUnit, wu); + protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) { + final ProcessWorkUnit processWorkUnitStub = Workflow.newActivityStub(ProcessWorkUnit.class, TemporalActivityUtils.buildActivityOptions( + ActivityType.PROCESS_WORKUNIT, props)); + return Async.function(processWorkUnitStub::processWorkUnit, wu); Review Comment: Once this Async function execution concludes, add appropriate logs in case of success or exception message in case of failure. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Properties; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * Interface for defining timeout strategies for different Temporal activities. Review Comment: [NIT] timeout strategies -> "activity configuration strategies" ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.util; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import com.google.common.annotations.VisibleForTesting; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import lombok.experimental.UtilityClass; + +import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; + + +/** Utility class for handling Temporal Activity related operations. */ +@UtilityClass +public class TemporalActivityUtils { + + @VisibleForTesting + protected static final RetryOptions DEFAULT_RETRY_OPTIONS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final Map<ActivityType, ActivityConfigurationStrategy> activityConfigurationStrategies = new HashMap<>(); + + static { + activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy()); + activityConfigurationStrategies.put(ActivityType.COMMIT, new ActivityConfigurationStrategy.CommitActivityConfigurationStrategy()); + } + + /** + * Builds and returns an {@link ActivityOptions} object configured with the specified {@link ActivityType} and properties. + * + * @param activityType the type of the activity for which the options are being built. + * @param props the properties to be used for configuring the activity options. + * @return an {@link ActivityOptions} object configured with the specified activity type and properties. + */ + public static ActivityOptions buildActivityOptions(ActivityType activityType, Properties props) { + return ActivityOptions.newBuilder() + .setStartToCloseTimeout(getStartToCloseTimeout(activityType, props)) + .setRetryOptions(buildRetryOptions(activityType, props)) + .build(); + } + + /** + * Retrieves the start to close timeout duration for a given {@link ActivityType} based on the provided properties. + * + * @param activityType the type of the activity for which the start to close timeout is being retrieved. + * @param props the properties to be used for configuring the timeout. + * @return the start to close timeout duration for the specified activity type. + */ + private static Duration getStartToCloseTimeout(ActivityType activityType, Properties props) { + ActivityConfigurationStrategy activityConfigurationStrategy = activityConfigurationStrategies.get(activityType); + if (activityConfigurationStrategy == null) { + return ActivityConfigurationStrategy.defaultStartToCloseTimeout; Review Comment: Let's add log to track this scenario ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java: ########## @@ -42,22 +42,10 @@ @Slf4j public class CommitStepWorkflowImpl implements CommitStepWorkflow { - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofHours(3)) // TODO: make configurable... also add activity heartbeats - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS); - @Override - public CommitStats commit(WUProcessingSpec workSpec) { + public CommitStats commit(WUProcessingSpec workSpec, final Properties props) { + final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, TemporalActivityUtils.buildActivityOptions( Review Comment: Would suggest to have this variable inside the class as instance variable as this can be used for CommitStepWorkflow across different methods of this class. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java: ########## @@ -54,37 +54,30 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow"; @Override - public CommitStats process(WUProcessingSpec workSpec) { + public CommitStats process(WUProcessingSpec workSpec, final Properties props) { Review Comment: Can we add unit tests for this class? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Properties; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * Interface for defining timeout strategies for different Temporal activities. + * Each strategy provides a method to retrieve the timeout duration based on the provided properties. Review Comment: change as per scope of new class to something like -> "Each strategy provides a method to retrieve configuration details, such as timeout duration, based on the provided properties." ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java: ########## @@ -84,9 +90,11 @@ public int performWorkload( if (numDirectLeavesChildMayHave > 0) { Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave)); } - childSubTrees.add( - Async.function(child::performWorkload, childAddr, workload, childStartIndex, maxBranchesPerTree, - maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree))); + NestingExecWorkloadInput<WORK_ITEM> childInput = new NestingExecWorkloadInput<>( Review Comment: Can we add unit tests for this? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java: ########## @@ -32,24 +34,11 @@ public class NestingExecOfIllustrationItemActivityWorkflowImpl extends AbstractNestingExecWorkflowImpl<IllustrationItem, String> { - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(1)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(3) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofSeconds(10)) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final IllustrationItemActivity activityStub = - Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS); - @Override - protected Promise<String> launchAsyncActivity(final IllustrationItem item) { + protected Promise<String> launchAsyncActivity(final IllustrationItem item, final Properties props) { + final IllustrationItemActivity activityStub = + Workflow.newActivityStub(IllustrationItemActivity.class, + TemporalActivityUtils.buildActivityOptions(ActivityType.DEFAULT_ACTIVITY, props)); Review Comment: Any particular reason why we are using default activity here? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java: ########## @@ -17,39 +17,26 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.util.TemporalActivityUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; /** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> { - public static final Duration processWorkUnitStartToCloseTimeout = Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats - - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); @Override - protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) { - return Async.function(activityStub::processWorkUnit, wu); + protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) { Review Comment: Do we plan to add unit tests for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org