This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1d9398214e [GOBBLIN-2190] Added SUBMITGTE activity to ActivityType
enum (#4102)
1d9398214e is described below
commit 1d9398214e03ebed55b6d00a7b8eb39ec1fcce22
Author: Vivek Rai <[email protected]>
AuthorDate: Wed Feb 26 16:11:29 2025 +0530
[GOBBLIN-2190] Added SUBMITGTE activity to ActivityType enum (#4102)
---
.../gobblin/temporal/GobblinTemporalConfigurationKeys.java | 2 ++
.../apache/gobblin/temporal/ddm/activity/ActivityType.java | 3 +++
.../temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java | 2 +-
.../ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java | 8 ++++----
.../ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java | 6 +++---
.../workflows/helloworld/GreetingWorkflowImpl.java | 3 ++-
.../temporal/workflows/metrics/TemporalEventTimer.java | 14 ++++----------
.../gobblin/temporal/ddm/activity/ActivityTypeTest.java | 6 ++++++
8 files changed, 25 insertions(+), 19 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index f2ebcc31ea..ffb1c424b5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -104,6 +104,8 @@ public interface GobblinTemporalConfigurationKeys {
PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
index 7c796ebe35..54953abe10 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
@@ -49,6 +49,9 @@ public enum ActivityType {
/** Activity type for committing step. */
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+ /** Activity type for submitting GTE. */
+
SUBMIT_GTE(GobblinTemporalConfigurationKeys.TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
/** Default placeholder activity type. */
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index b440658904..5a3350fc7d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -46,7 +46,7 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
final CommitActivity activityStub =
Workflow.newActivityStub(CommitActivity.class,
ActivityType.COMMIT.buildActivityOptions(props));
CommitStats commitGobblinStats = activityStub.commit(workSpec);
if (!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(),
props);
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES,
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
.submit();// emit job summary info on both full and partial commit
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 661847bdce..d841b00c22 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -83,14 +83,14 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
+ // Filtering only temporal job properties to pass to child workflows to
avoid passing unnecessary properties
+ final Properties temporalJobProps =
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext,
temporalJobProps);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); //
update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult =
Optional.empty();
WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
- // Filtering only temporal job properties to pass to child workflows to
avoid passing unnecessary properties
- final Properties temporalJobProps =
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
-
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
boolean isSuccessful = false;
try (Closer closer = Closer.create()) {
final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class,
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 00def8a2ee..e0119cfc5d 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -55,7 +55,7 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
@Override
public CommitStats process(WUProcessingSpec workSpec, final Properties
props) {
- Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
+ Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec, props);
CommitStats result = performWork(workSpec, props);
timer.ifPresent(EventTimer::stop);
return result;
@@ -111,10 +111,10 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
return result;
}
- private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec
workSpec) {
+ private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec
workSpec, Properties props) {
if (workSpec.isToDoJobLevelTiming()) {
EventSubmitterContext eventSubmitterContext =
workSpec.getEventSubmitterContext();
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, props);
return Optional.of(timerFactory.createJobTimer());
} else {
return Optional.empty();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index c576a49550..ea930b0573 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.workflows.helloworld;
import java.time.Duration;
+import java.util.Properties;
import org.slf4j.Logger;
@@ -57,7 +58,7 @@ public class GreetingWorkflowImpl implements GreetingWorkflow
{
/**
* Example of the {@link TemporalEventTimer.Factory} invoking child
activity for instrumentation.
*/
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, new
Properties());
try (TemporalEventTimer timer =
timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
timer.withMetadata("name", name);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index 4a6aa8d0bb..6686b5cef9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -19,19 +19,20 @@ package org.apache.gobblin.temporal.workflows.metrics;
import java.time.Duration;
import java.time.Instant;
+import java.util.Properties;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.util.GsonUtils;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
/**
@@ -153,16 +154,9 @@ public class TemporalEventTimer implements EventTimer {
* addition, it uses the `Workflow`-safe {@link
Workflow#currentTimeMillis()}.
*/
public static class WithinWorkflowFactory extends Factory {
- private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofHours(6)) // maximum timeout for
the actual event submission to kafka, waiting out a kafka outage
- .build();
- public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext) {
- this(eventSubmitterContext, DEFAULT_OPTS);
- }
-
- public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext,
ActivityOptions opts) {
- super(eventSubmitterContext,
Workflow.newActivityStub(SubmitGTEActivity.class, opts),
WithinWorkflowFactory::getCurrentInstant);
+ public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext,
Properties props) {
+ super(eventSubmitterContext,
Workflow.newActivityStub(SubmitGTEActivity.class,
ActivityType.SUBMIT_GTE.buildActivityOptions(props)),
WithinWorkflowFactory::getCurrentInstant);
}
/**
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
index cdab706d86..d94bda0ca5 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
@@ -69,10 +69,16 @@ public class ActivityTypeTest {
{ActivityType.DELETE_WORK_DIRS, 222},
{ActivityType.PROCESS_WORKUNIT, 555},
{ActivityType.COMMIT, 444},
+ {ActivityType.SUBMIT_GTE, 999},
{ActivityType.DEFAULT_ACTIVITY, 1}
};
}
+ @Test
+ public void
testActivityTypesWithStartToCloseTimeoutDataProviderHasAllActivityTypes() {
+ Assert.assertEquals(activityTypesWithStartToCloseTimeout().length,
activityTypes.size());
+ }
+
@Test(dataProvider = "activityTypesWithStartToCloseTimeout")
public void testStartToCloseTimeout(ActivityType activityType, int
expectedTimeout) {
props.setProperty(activityType.getStartToCloseTimeoutConfigKey(),
Integer.toString(expectedTimeout));