This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f2bcdc7329 [GOBBLIN-2190] Implement ActivityType & add HeartBeat for
Temporal Activities (#4093)
f2bcdc7329 is described below
commit f2bcdc7329dedd5a0d19fb923f60de278ed92d6c
Author: Vivek Rai <[email protected]>
AuthorDate: Tue Feb 25 14:19:41 2025 +0530
[GOBBLIN-2190] Implement ActivityType & add HeartBeat for Temporal
Activities (#4093)
Made StartToClose timeout configurable and added heartbeat
---
.../temporal/GobblinTemporalConfigurationKeys.java | 30 +++++
.../temporal/ddm/activity/ActivityType.java | 101 +++++++++++++++++
.../ddm/activity/impl/CommitActivityImpl.java | 16 +++
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 15 +++
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 14 +++
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 2 +-
.../gobblin/temporal/ddm/util/JobStateUtils.java | 6 +
.../temporal/ddm/work/WUProcessingSpec.java | 5 +-
.../temporal/ddm/workflow/CommitStepWorkflow.java | 4 +-
.../ddm/workflow/ProcessWorkUnitsWorkflow.java | 4 +-
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 22 +---
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 61 +++-------
.../NestingExecOfProcessWorkUnitWorkflowImpl.java | 28 ++---
.../impl/ProcessWorkUnitsWorkflowImpl.java | 35 +++---
.../launcher/GenArbitraryLoadJobLauncher.java | 5 +-
...ExecOfIllustrationItemActivityWorkflowImpl.java | 27 ++---
.../nesting/work/NestingExecWorkloadInput.java | 43 +++++++
.../workflow/AbstractNestingExecWorkflowImpl.java | 34 +++---
.../util/nesting/workflow/NestingExecWorkflow.java | 13 +--
.../temporal/ddm/activity/ActivityTypeTest.java | 124 +++++++++++++++++++++
.../org/apache/gobblin/util/PropertiesUtils.java | 8 ++
.../apache/gobblin/util/PropertiesUtilsTest.java | 16 +++
22 files changed, 455 insertions(+), 158 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 82ea646090..f2ebcc31ea 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_METRICS_REPORT_INTERVAL_SECS =
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY =
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
+
+ /**
+ * Activities timeout configs
+ */
+ String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX +
"activity.heartbeat.timeout.minutes";
+ int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5;
+ String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX +
"activity.heartbeat.interval.minutes";
+ int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1;
+ String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
"activity.starttoclose.timeout.minutes";
+ int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360;
+ String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
+ int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds";
+ int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100;
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient";
+ double DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2;
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
+ int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;
+
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
new file mode 100644
index 0000000000..7c796ebe35
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be
performed.
+ */
+public enum ActivityType {
+ /** Activity type for generating work units. */
+
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for recommending scaling operations. */
+
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for deleting work directories. */
+
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for processing a work unit. */
+
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for committing step. */
+
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Default placeholder activity type. */
+
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+ @Getter private final String startToCloseTimeoutConfigKey;
+
+ ActivityType(String startToCloseTimeoutConfigKey) {
+ this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+ }
+
+ public ActivityOptions buildActivityOptions(Properties props) {
+ return ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(getStartToCloseTimeout(props))
+ .setHeartbeatTimeout(getHeartbeatTimeout(props))
+ .setRetryOptions(buildRetryOptions(props))
+ .build();
+ }
+
+ private Duration getStartToCloseTimeout(Properties props) {
+ return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
this.startToCloseTimeoutConfigKey,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+ }
+
+ private Duration getHeartbeatTimeout(Properties props) {
+ return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+ }
+
+ private RetryOptions buildRetryOptions(Properties props) {
+ int maximumIntervalSeconds = PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS);
+
+ int initialIntervalSeconds = Math.min(PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS),
+ maximumIntervalSeconds);
+
+ return RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(initialIntervalSeconds))
+ .setMaximumInterval(Duration.ofSeconds(maximumIntervalSeconds))
+ .setBackoffCoefficient(PropertiesUtils.getPropAsDouble(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT))
+ .setMaximumAttempts(PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS))
+ .build();
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index b43c079c10..b106cdc153 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -27,6 +27,9 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -41,6 +44,9 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.Closer;
+
+import io.temporal.activity.Activity;
+import io.temporal.activity.ActivityExecutionContext;
import io.temporal.failure.ApplicationFailure;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -85,12 +91,21 @@ public class CommitActivityImpl implements CommitActivity {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
+ ActivityExecutionContext activityExecutionContext =
Activity.getExecutionContext();
+ ScheduledExecutorService heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(
+
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+
com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor")));
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
Optional<String> optJobName = Optional.empty();
AutomaticTroubleshooter troubleshooter = null;
try (FileSystem fs = Help.loadFileSystem(workSpec)) {
JobState jobState = Help.loadJobState(workSpec, fs);
+
+ int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
+ heartBeatExecutor.scheduleAtFixedRate(() ->
activityExecutionContext.heartbeat("Running Commit Activity"),
+ heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
+
optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
troubleshooter =
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
@@ -147,6 +162,7 @@ public class CommitActivityImpl implements CommitActivity {
String errCorrelator = String.format("Commit [%s]",
calcCommitId(workSpec));
EventSubmitter eventSubmitter =
workSpec.getEventSubmitterContext().create();
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log,
errCorrelator);
+ ExecutorsUtils.shutdownExecutorService(heartBeatExecutor,
com.google.common.base.Optional.of(log));
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 33f60bd779..fdeaf42487 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -24,6 +24,9 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +41,8 @@ import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.tdunning.math.stats.TDigest;
import io.temporal.failure.ApplicationFailure;
+import io.temporal.activity.Activity;
+import io.temporal.activity.ActivityExecutionContext;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -66,6 +71,7 @@ import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.writer.initializer.WriterInitializer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -122,11 +128,19 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
@Override
public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
+ ActivityExecutionContext activityExecutionContext =
Activity.getExecutionContext();
+ ScheduledExecutorService heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(
+
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+
com.google.common.base.Optional.of("GenerateWorkUnitsActivityHeartBeatExecutor")));
// TODO: decide whether to acquire a job lock (as MR did)!
// TODO: provide for job cancellation (unless handling at the
temporal-level of parent workflows)!
JobState jobState = new JobState(jobProps);
log.info("Created jobState: {}", jobState.toJsonString(true));
+ int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
+ heartBeatExecutor.scheduleAtFixedRate(() ->
activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
+ heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
+
Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
log.info("Using work dir root path for job '{}' - '{}'",
jobState.getJobId(), workDirRoot);
@@ -177,6 +191,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
} finally {
EventSubmitter eventSubmitter = eventSubmitterContext.create();
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log,
jobState.getJobId());
+ ExecutorsUtils.shutdownExecutorService(heartBeatExecutor,
com.google.common.base.Optional.of(log));
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index a6753245b7..9450bc7bcb 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -28,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;
+import io.temporal.activity.Activity;
+import io.temporal.activity.ActivityExecutionContext;
import lombok.extern.slf4j.Slf4j;
@@ -56,6 +61,7 @@ import
org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JobLauncherUtils;
@@ -68,6 +74,10 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
@Override
public int processWorkUnit(WorkUnitClaimCheck wu) {
+ ActivityExecutionContext activityExecutionContext =
Activity.getExecutionContext();
+ ScheduledExecutorService heartBeatExecutor =
Executors.newSingleThreadScheduledExecutor(
+
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+
com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor")));
AutomaticTroubleshooter troubleshooter = null;
EventSubmitter eventSubmitter = wu.getEventSubmitterContext().create();
String correlator = String.format("(M)WU [%s]", wu.getCorrelator());
@@ -75,6 +85,9 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
log.info("{} - loaded; found {} workUnits", correlator,
workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
+ int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
+ heartBeatExecutor.scheduleAtFixedRate(() ->
activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity"),
+ heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
troubleshooter =
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
return execute(workUnits, wu, jobState, fs,
troubleshooter.getIssueRepository());
@@ -82,6 +95,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
throw new RuntimeException(e);
} finally {
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log,
correlator);
+ ExecutorsUtils.shutdownExecutorService(heartBeatExecutor,
com.google.common.base.Optional.of(log));
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 709a9cf935..954269948f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -112,7 +112,7 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec,
Help.loadFileSystem(wuSpec)));
ProcessWorkUnitsWorkflow workflow =
this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
- workflow.process(wuSpec);
+ workflow.process(wuSpec, jobProps);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index e606601956..be31d3ec4f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -46,6 +46,7 @@ import org.apache.gobblin.runtime.SourceDecorator;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.JobLauncherUtils;
@@ -238,4 +239,9 @@ public class JobStateUtils {
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
return globalBroker.newSubscopedBuilder(new
JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
}
+
+ public static int getHeartBeatInterval(JobState jobState) {
+ return
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES);
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index dfa207f66c..3a05e60f01 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
-import java.util.Optional;
import org.apache.hadoop.fs.Path;
@@ -36,8 +35,6 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
-import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
-import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -75,7 +72,7 @@ public class WUProcessingSpec implements FileSystemApt,
FileSystemJobStateful {
return new Path(new Path(workUnitsDir).getParent(),
AbstractJobLauncher.JOB_STATE_FILE_NAME);
}
- /** Configuration for {@link
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr,
Workload, int, int, int, Optional)}*/
+ /** Configuration for {@link
org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput} */
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
index c536828520..e47c0a9b15 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.temporal.ddm.workflow;
+import java.util.Properties;
+
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
@@ -35,5 +37,5 @@ public interface CommitStepWorkflow {
* @return number of workunits committed
*/
@WorkflowMethod
- CommitStats commit(WUProcessingSpec workSpec);
+ CommitStats commit(WUProcessingSpec workSpec, Properties props);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index a6018d41f1..8adfd34185 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.temporal.ddm.workflow;
+import java.util.Properties;
+
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
@@ -30,5 +32,5 @@ import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
public interface ProcessWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s cumulatively processed
successfully */
@WorkflowMethod
- CommitStats process(WUProcessingSpec wuSpec);
+ CommitStats process(WUProcessingSpec workSpec, Properties props);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 0f28901883..b440658904 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -17,13 +17,11 @@
package org.apache.gobblin.temporal.ddm.workflow.impl;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;
@@ -31,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
@@ -42,22 +41,9 @@ import
org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
@Slf4j
public class CommitStepWorkflowImpl implements CommitStepWorkflow {
- private static final RetryOptions ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofHours(3)) // TODO: make
configurable... also add activity heartbeats
- .setRetryOptions(ACTIVITY_RETRY_OPTS)
- .build();
-
- private final CommitActivity activityStub =
Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
-
@Override
- public CommitStats commit(WUProcessingSpec workSpec) {
+ public CommitStats commit(WUProcessingSpec workSpec, final Properties props)
{
+ final CommitActivity activityStub =
Workflow.newActivityStub(CommitActivity.class,
ActivityType.COMMIT.buildActivityOptions(props));
CommitStats commitGobblinStats = activityStub.commit(workSpec);
if (!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 15184c6ccc..661847bdce 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -39,9 +39,7 @@ import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import io.temporal.activity.ActivityOptions;
import io.temporal.api.enums.v1.ParentClosePolicy;
-import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
@@ -50,6 +48,7 @@ import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
@@ -72,6 +71,7 @@ import
org.apache.gobblin.temporal.dynamic.ScalingDirectivesRecipient;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
@@ -81,49 +81,6 @@ import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits";
- public static final Duration genWUsStartToCloseTimeout =
Duration.ofHours(2); // TODO: make configurable... also add activity heartbeats
-
- private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(genWUsStartToCloseTimeout)
- .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
- .build();
-
- private final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS);
-
- private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofMinutes(5))
- .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS)
- .build();
- private final RecommendScalingForWorkUnits recommendScalingStub =
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
- RECOMMEND_SCALING_ACTIVITY_OPTS);
-
- private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofMinutes(10))
- .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
- .build();
- private final DeleteWorkDirsActivity deleteWorkDirsActivityStub =
Workflow.newActivityStub(DeleteWorkDirsActivity.class,
DELETE_WORK_DIRS_ACTIVITY_OPTS);
-
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
@@ -131,8 +88,13 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult =
Optional.empty();
WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
+ // Filtering only temporal job properties to pass to child workflows to
avoid passing unnecessary properties
+ final Properties temporalJobProps =
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
boolean isSuccessful = false;
try (Closer closer = Closer.create()) {
+ final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class,
+
ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps));
GenerateWorkUnitsResult generateWorkUnitResult =
genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
WorkUnitsSizeSummary wuSizeSummary =
generateWorkUnitResult.getWorkUnitsSizeSummary();
@@ -141,6 +103,8 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
CommitStats commitStats = CommitStats.createEmpty();
if (numWUsGenerated > 0) {
TimeBudget timeBudget =
calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps);
+ final RecommendScalingForWorkUnits recommendScalingStub =
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
+
ActivityType.RECOMMEND_SCALING.buildActivityOptions(temporalJobProps));
List<ScalingDirective> scalingDirectives =
recommendScalingStub.recommendScaling(wuSizeSummary,
generateWorkUnitResult.getSourceClass(), timeBudget, jobProps);
log.info("Recommended scaling to process WUs within {}: {}",
timeBudget, scalingDirectives);
@@ -156,7 +120,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
}
ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
- commitStats = processWUsWorkflow.process(wuSpec);
+ commitStats = processWUsWorkflow.process(wuSpec, temporalJobProps);
numWUsCommitted = commitStats.getNumCommittedWorkUnits();
}
jobSuccessTimer.stop();
@@ -270,6 +234,11 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
return;
}
+ final DeleteWorkDirsActivity deleteWorkDirsActivityStub =
Workflow.newActivityStub(
+ DeleteWorkDirsActivity.class,
+
ActivityType.DELETE_WORK_DIRS.buildActivityOptions(jobState.getProperties())
+ );
+
DirDeletionResult dirDeletionResult =
deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean));
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
index 96591e6547..6f22e3dd8b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java
@@ -17,13 +17,13 @@
package org.apache.gobblin.temporal.ddm.workflow.impl;
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
+import java.util.Properties;
+
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
-import java.time.Duration;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;
@@ -31,25 +31,11 @@ import
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWork
/** {@link
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for
{@link ProcessWorkUnit} */
public class NestingExecOfProcessWorkUnitWorkflowImpl extends
AbstractNestingExecWorkflowImpl<WorkUnitClaimCheck, Integer> {
- public static final Duration processWorkUnitStartToCloseTimeout =
Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats
-
- // RetryOptions specify how to automatically handle retries when Activities
fail.
- private static final RetryOptions ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout)
- .setRetryOptions(ACTIVITY_RETRY_OPTS)
- .build();
-
- private final ProcessWorkUnit activityStub =
Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS);
@Override
- protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu) {
- return Async.function(activityStub::processWorkUnit, wu);
+ protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu,
final Properties props) {
+ final ProcessWorkUnit processWorkUnitStub =
Workflow.newActivityStub(ProcessWorkUnit.class,
+ ActivityType.PROCESS_WORKUNIT.buildActivityOptions(props));
+ return Async.function(processWorkUnitStub::processWorkUnit, wu);
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 7b3f171967..00def8a2ee 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -18,10 +18,10 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
-import com.google.common.io.Closer;
import com.typesafe.config.ConfigFactory;
import io.temporal.api.enums.v1.ParentClosePolicy;
@@ -33,6 +33,7 @@ import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
@@ -45,7 +46,6 @@ import
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
-import org.apache.gobblin.runtime.JobState;
@Slf4j
@@ -54,37 +54,30 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
public static final String COMMIT_STEP_WORKFLOW_ID_BASE =
"CommitStepWorkflow";
@Override
- public CommitStats process(WUProcessingSpec workSpec) {
+ public CommitStats process(WUProcessingSpec workSpec, final Properties
props) {
Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
- CommitStats result = performWork(workSpec);
+ CommitStats result = performWork(workSpec, props);
timer.ifPresent(EventTimer::stop);
return result;
}
- private CommitStats performWork(WUProcessingSpec workSpec) {
+ private CommitStats performWork(WUProcessingSpec workSpec, final Properties
props) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
- Map<String, Object> searchAttributes;
- JobState jobState;
- try (Closer closer = Closer.create()) {
- jobState = Help.loadJobState(workSpec,
closer.register(Help.loadFileSystem(workSpec)));
- } catch (Exception e) {
- log.error("Error loading jobState", e);
- throw new RuntimeException("Error loading jobState", e);
- }
- searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
-
+ Map<String, Object> searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(props);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
Optional<Integer> workunitsProcessed = Optional.empty();
try {
- workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ NestingExecWorkloadInput<WorkUnitClaimCheck>
+ performWorkloadInput = new
NestingExecWorkloadInput<>(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(),
- Optional.empty()));
+ Optional.empty(), props);
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(performWorkloadInput));
} catch (Exception e) {
log.error("ProcessWorkUnits failure - attempting partial commit before
re-throwing exception", e);
try {
- performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed);// Attempt partial commit before surfacing the failure
+ performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed, props);// Attempt partial commit before surfacing the
failure
} catch (Exception commitException) {
// Combine current and commit exception messages for a more complete
context
String combinedMessage = String.format(
@@ -100,18 +93,18 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
throw e;// Re-throw after any partial commit, to fail the parent
workflow in case commitWorkflow didn't flow (unlikely)
}
- return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed);
+ return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed, props);
}
private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec
workSpec,
- Map<String, Object> searchAttributes, Optional<Integer>
workunitsProcessed) {
+ Map<String, Object> searchAttributes, Optional<Integer>
workunitsProcessed, Properties props) {
// we are only inhibiting commit when workunitsProcessed is actually
known to be zero
if (workunitsProcessed.filter(n -> n == 0).isPresent()) {
log.error("No work units processed, so no commit attempted.");
return CommitStats.createEmpty();
}
CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
- CommitStats result = commitWorkflow.commit(workSpec);
+ CommitStats result = commitWorkflow.commit(workSpec, props);
if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have been
committed at the task level.");
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
index 86f1d4a092..5315f464f3 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
@@ -38,6 +38,7 @@ import
org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow;
@@ -86,6 +87,8 @@ public class GenArbitraryLoadJobLauncher extends
GobblinTemporalJobLauncher {
// WARNING: although type param must agree w/ that of `workload`, it's
entirely unverified by type checker!
// ...and more to the point, mismatch would occur at runtime
(`performWorkload` on the workflow type given to the stub)!
NestingExecWorkflow<IllustrationItem> workflow =
this.client.newWorkflowStub(NestingExecWorkflow.class, options);
- workflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
maxBranchesPerTree, maxSubTreesPerTree, Optional.empty());
+ NestingExecWorkloadInput<IllustrationItem> performWorkloadInput = new
NestingExecWorkloadInput<>(WorkflowAddr.ROOT,
+ workload, 0, maxBranchesPerTree, maxSubTreesPerTree, Optional.empty(),
new Properties());
+ workflow.performWorkload(performWorkloadInput);
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
index 4346eecfdf..5355d9442d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java
@@ -17,12 +17,13 @@
package org.apache.gobblin.temporal.loadgen.workflow.impl;
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
+import java.util.Properties;
+
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import io.temporal.workflow.Workflow;
-import java.time.Duration;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity;
import org.apache.gobblin.temporal.loadgen.work.IllustrationItem;
import
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl;
@@ -32,24 +33,10 @@ import
org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWork
public class NestingExecOfIllustrationItemActivityWorkflowImpl
extends AbstractNestingExecWorkflowImpl<IllustrationItem, String> {
- // RetryOptions specify how to automatically handle retries when Activities
fail.
- private static final RetryOptions ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(1))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(3)
- .build();
-
- private static final ActivityOptions ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofSeconds(10))
- .setRetryOptions(ACTIVITY_RETRY_OPTS)
- .build();
-
- private final IllustrationItemActivity activityStub =
- Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS);
-
@Override
- protected Promise<String> launchAsyncActivity(final IllustrationItem item) {
+ protected Promise<String> launchAsyncActivity(final IllustrationItem item,
final Properties props) {
+ final IllustrationItemActivity activityStub =
+
Workflow.newActivityStub(IllustrationItemActivity.class,ActivityType.DEFAULT_ACTIVITY.buildActivityOptions(props));
return Async.function(activityStub::handleItem, item);
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java
new file mode 100644
index 0000000000..4b2f3ff50e
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.util.nesting.work;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+
+/** A wrapper class to be used as function param for {@link
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(NestingExecWorkloadInput)}*/
+@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@AllArgsConstructor
+public class NestingExecWorkloadInput<WORK_ITEM> {
+ private WorkflowAddr addr;
+ private Workload<WORK_ITEM> workload;
+ private int startIndex;
+ private int maxBranchesPerTree;
+ private int maxSubTreesPerTree;
+ private Optional<Integer> maxSubTreesForCurrentTreeOverride;
+ private Properties props;
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
index 92ef6e1af9..1863724a1f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -35,6 +36,7 @@ import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
@@ -46,14 +48,18 @@ public abstract class
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
public static final int
MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT = 100;
@Override
- public int performWorkload(
- final WorkflowAddr addr,
- final Workload<WORK_ITEM> workload,
- final int startIndex,
- final int maxBranchesPerTree,
- final int maxSubTreesPerTree,
- final Optional<Integer> maxSubTreesForCurrentTreeOverride
- ) {
+ public int performWorkload(NestingExecWorkloadInput<WORK_ITEM>
performWorkloadInput) {
+ return performWorkloadInternal(performWorkloadInput);
+ }
+
+ private int performWorkloadInternal(NestingExecWorkloadInput<WORK_ITEM>
performWorkloadInput) {
+ final WorkflowAddr addr = performWorkloadInput.getAddr();
+ final Workload<WORK_ITEM> workload = performWorkloadInput.getWorkload();
+ final int startIndex = performWorkloadInput.getStartIndex();
+ final int maxBranchesPerTree =
performWorkloadInput.getMaxBranchesPerTree();
+ final int maxSubTreesPerTree =
performWorkloadInput.getMaxSubTreesPerTree();
+ final Optional<Integer> maxSubTreesForCurrentTreeOverride =
performWorkloadInput.getMaxSubTreesForCurrentTreeOverride();
+ final Properties props = performWorkloadInput.getProps();
final int maxSubTreesForCurrent =
maxSubTreesForCurrentTreeOverride.orElse(maxSubTreesPerTree);
final int maxLeaves = maxBranchesPerTree - maxSubTreesForCurrent;
final Optional<Workload.WorkSpan<WORK_ITEM>> optSpan =
workload.getSpan(startIndex, maxLeaves);
@@ -65,7 +71,7 @@ public abstract class
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
final Workload.WorkSpan<WORK_ITEM> workSpan = optSpan.get();
final Iterable<WORK_ITEM> iterable = () -> workSpan;
final List<Promise<ACTIVITY_RESULT>> childActivities =
StreamSupport.stream(iterable.spliterator(), false)
- .map(t -> launchAsyncActivity(t))
+ .map(t -> launchAsyncActivity(t, props))
.collect(Collectors.toList());
final List<Promise<Integer>> childSubTrees = new ArrayList<>();
if (workSpan.getNumElems() == maxLeaves) { // received as many as
requested (did not stop short)
@@ -84,9 +90,11 @@ public abstract class
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
if (numDirectLeavesChildMayHave > 0) {
Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave));
}
- childSubTrees.add(
- Async.function(child::performWorkload, childAddr, workload,
childStartIndex, maxBranchesPerTree,
- maxSubTreesPerTree,
Optional.of(subTreeChildMaxSubTreesPerTree)));
+ NestingExecWorkloadInput<WORK_ITEM> childInput = new
NestingExecWorkloadInput<>(
+ childAddr, workload, childStartIndex, maxBranchesPerTree,
+ maxSubTreesPerTree,
Optional.of(subTreeChildMaxSubTreesPerTree), props
+ );
+ childSubTrees.add(Async.function(child::performWorkload,
childInput));
++subTreeId;
}
}
@@ -103,7 +111,7 @@ public abstract class
AbstractNestingExecWorkflowImpl<WORK_ITEM, ACTIVITY_RESULT
}
/** Factory for invoking the specific activity by providing it args via
{@link Async::function} */
- protected abstract Promise<ACTIVITY_RESULT> launchAsyncActivity(WORK_ITEM
task);
+ protected abstract Promise<ACTIVITY_RESULT> launchAsyncActivity(WORK_ITEM
task, Properties props);
protected NestingExecWorkflow<WORK_ITEM> createChildWorkflow(final
WorkflowAddr childAddr) {
// preserve the current workflow ID of this parent, but add the
(hierarchical) address extension specific to each child
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
index 3a6661d090..e0c3ebfa9d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java
@@ -17,12 +17,10 @@
package org.apache.gobblin.temporal.util.nesting.workflow;
-import java.util.Optional;
-
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
-import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
+import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
@@ -44,12 +42,5 @@ import
org.apache.gobblin.temporal.util.nesting.work.Workload;
public interface NestingExecWorkflow<WORK_ITEM> {
/** @return the number of workload elements processed cumulatively by this
Workflow and its children */
@WorkflowMethod
- int performWorkload(
- WorkflowAddr addr,
- Workload<WORK_ITEM> workload,
- int startIndex,
- int maxBranchesPerTree,
- int maxSubTreesPerTree,
- Optional<Integer> maxSubTreesForCurrentTreeOverride
- );
+ int performWorkload(NestingExecWorkloadInput<WORK_ITEM>
performWorkloadInput);
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
new file mode 100644
index 0000000000..cdab706d86
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityOptions;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/** Tests for {@link ActivityType} */
+public class ActivityTypeTest {
+
+ private Properties props;
+ private final List<ActivityType> activityTypes =
Arrays.asList(ActivityType.values());
+
+ @BeforeMethod
+ public void setUp() {
+ props = new Properties();
+ }
+
+ @Test
+ public void testDefaultValuesForTimeouts() {
+ activityTypes.stream().map(activityType ->
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+ Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
+
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+ Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
+
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
+
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS));
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
+
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS));
+
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
0.01);
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(),
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS);
+ });
+ }
+
+ @DataProvider(name = "activityTypesWithStartToCloseTimeout")
+ public Object[][] activityTypesWithStartToCloseTimeout() {
+ return new Object[][] {
+ {ActivityType.GENERATE_WORKUNITS, 333},
+ {ActivityType.RECOMMEND_SCALING, 111},
+ {ActivityType.DELETE_WORK_DIRS, 222},
+ {ActivityType.PROCESS_WORKUNIT, 555},
+ {ActivityType.COMMIT, 444},
+ {ActivityType.DEFAULT_ACTIVITY, 1}
+ };
+ }
+
+ @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+ public void testStartToCloseTimeout(ActivityType activityType, int
expectedTimeout) {
+ props.setProperty(activityType.getStartToCloseTimeoutConfigKey(),
Integer.toString(expectedTimeout));
+
Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(),
Duration.ofMinutes(expectedTimeout));
+ }
+
+ @Test
+ public void testHeartBeatTimeout() {
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
"14");
+ activityTypes.stream().map(activityType ->
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+ Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
Duration.ofMinutes(14));
+ });
+ }
+
+ @Test
+ public void testRetryOptions() {
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
"115");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
"5550");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
"7.0");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
"21");
+
+ activityTypes.stream().map(activityType ->
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
Duration.ofSeconds(115));
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
Duration.ofSeconds(5550));
+
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
7.0, 0.01);
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
+ });
+ }
+
+ @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+ public void testBuildActivityOptions(ActivityType activityType, int
expectedTimeout) {
+ props.setProperty(activityType.getStartToCloseTimeoutConfigKey(),
Integer.toString(expectedTimeout));
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
"144");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
"115");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
"5550");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
"7.0");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
"21");
+
+ ActivityOptions activityOptions = activityType.buildActivityOptions(props);
+
+ Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
Duration.ofMinutes(expectedTimeout));
+ Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
Duration.ofMinutes(144));
+
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
Duration.ofSeconds(115));
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
Duration.ofSeconds(5550));
+
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
7.0, 0.01);
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
+ }
+
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 28f0daa93f..db5518f54d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -92,6 +92,14 @@ public class PropertiesUtils {
return Long.parseLong(properties.getProperty(key,
Long.toString(defaultValue)));
}
+ public static double getPropAsDouble(Properties properties, String key,
double defaultValue) {
+ try {
+ return Double.parseDouble(properties.getProperty(key,
Double.toString(defaultValue)));
+ } catch (NullPointerException | NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
/**
* Get the value of a comma separated property as a {@link List} of strings.
*
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index 42291f4cae..856067220d 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -101,4 +101,20 @@ public class PropertiesUtilsTest {
Assert.assertEqualsNoOrder(PropertiesUtils.getValuesAsList(properties,
Optional.of("k")).toArray(), new String[]{"v1", "v2", "v2"});
}
+
+ @Test
+ public void testGetPropAsDouble() {
+ Properties properties = new Properties();
+ properties.put("k1", "1.0");
+ properties.put("k2", "1");
+ properties.put("k3", "1.00");
+ properties.put("k4", "");
+
+ Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "key",
5.01), 5.01, 0.01);
+ Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k1",
2.02), 1.00, 0.01);
+ Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k2",
2.02), 1.00, 0.01);
+ Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k3",
2.02), 1.00, 0.01);
+ Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k4",
10.001), 10.001, 0.001);
+ }
+
}