This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d9398214e [GOBBLIN-2190] Added SUBMITGTE activity to ActivityType 
enum (#4102)
1d9398214e is described below

commit 1d9398214e03ebed55b6d00a7b8eb39ec1fcce22
Author: Vivek Rai <[email protected]>
AuthorDate: Wed Feb 26 16:11:29 2025 +0530

    [GOBBLIN-2190] Added SUBMITGTE activity to ActivityType enum (#4102)
---
 .../gobblin/temporal/GobblinTemporalConfigurationKeys.java |  2 ++
 .../apache/gobblin/temporal/ddm/activity/ActivityType.java |  3 +++
 .../temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java |  2 +-
 .../ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java      |  8 ++++----
 .../ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java    |  6 +++---
 .../workflows/helloworld/GreetingWorkflowImpl.java         |  3 ++-
 .../temporal/workflows/metrics/TemporalEventTimer.java     | 14 ++++----------
 .../gobblin/temporal/ddm/activity/ActivityTypeTest.java    |  6 ++++++
 8 files changed, 25 insertions(+), 19 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index f2ebcc31ea..ffb1c424b5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -104,6 +104,8 @@ public interface GobblinTemporalConfigurationKeys {
       PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
   String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
       PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+  String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+      PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
   String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
   String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
   int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
index 7c796ebe35..54953abe10 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java
@@ -49,6 +49,9 @@ public enum ActivityType {
   /** Activity type for committing step. */
   
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
 
+  /** Activity type for submitting GTE. */
+  
SUBMIT_GTE(GobblinTemporalConfigurationKeys.TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
   /** Default placeholder activity type. */
   
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index b440658904..5a3350fc7d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -46,7 +46,7 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
     final CommitActivity activityStub = 
Workflow.newActivityStub(CommitActivity.class, 
ActivityType.COMMIT.buildActivityOptions(props));
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
     if (!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
-      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(), 
props);
       timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
           .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, 
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
           .submit();// emit job summary info on both full and partial commit 
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 661847bdce..d841b00c22 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -83,14 +83,14 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
 
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
-    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
+    // Filtering only temporal job properties to pass to child workflows to 
avoid passing unnecessary properties
+    final Properties temporalJobProps = 
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+        
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, 
temporalJobProps);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
     Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = 
Optional.empty();
     WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
-    // Filtering only temporal job properties to pass to child workflows to 
avoid passing unnecessary properties
-    final Properties temporalJobProps = 
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
-        
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
     boolean isSuccessful = false;
     try (Closer closer = Closer.create()) {
       final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class,
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 00def8a2ee..e0119cfc5d 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -55,7 +55,7 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
 
   @Override
   public CommitStats process(WUProcessingSpec workSpec, final Properties 
props) {
-    Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
+    Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec, props);
     CommitStats result = performWork(workSpec, props);
     timer.ifPresent(EventTimer::stop);
     return result;
@@ -111,10 +111,10 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
     return result;
   }
 
-  private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec 
workSpec) {
+  private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec 
workSpec, Properties props) {
     if (workSpec.isToDoJobLevelTiming()) {
       EventSubmitterContext eventSubmitterContext = 
workSpec.getEventSubmitterContext();
-      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, props);
       return Optional.of(timerFactory.createJobTimer());
     } else {
       return Optional.empty();
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index c576a49550..ea930b0573 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.temporal.workflows.helloworld;
 
 import java.time.Duration;
+import java.util.Properties;
 
 import org.slf4j.Logger;
 
@@ -57,7 +58,7 @@ public class GreetingWorkflowImpl implements GreetingWorkflow 
{
         /**
          * Example of the {@link TemporalEventTimer.Factory} invoking child 
activity for instrumentation.
          */
-        TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
+        TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, new 
Properties());
         try (TemporalEventTimer timer = 
timerFactory.create("getGreetingTime")) {
             LOG.info("Executing getGreeting");
             timer.withMetadata("name", name);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index 4a6aa8d0bb..6686b5cef9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -19,19 +19,20 @@ package org.apache.gobblin.temporal.workflows.metrics;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Properties;
 import java.util.function.Supplier;
 
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
-import io.temporal.activity.ActivityOptions;
 import io.temporal.workflow.Workflow;
 
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.util.GsonUtils;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
 
 
 /**
@@ -153,16 +154,9 @@ public class TemporalEventTimer implements EventTimer {
    *  addition, it uses the `Workflow`-safe {@link 
Workflow#currentTimeMillis()}.
    */
   public static class WithinWorkflowFactory extends Factory {
-    private static final ActivityOptions DEFAULT_OPTS = 
ActivityOptions.newBuilder()
-        .setStartToCloseTimeout(Duration.ofHours(6)) // maximum timeout for 
the actual event submission to kafka, waiting out a kafka outage
-        .build();
 
-    public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext) {
-      this(eventSubmitterContext, DEFAULT_OPTS);
-    }
-
-    public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, 
ActivityOptions opts) {
-      super(eventSubmitterContext, 
Workflow.newActivityStub(SubmitGTEActivity.class, opts), 
WithinWorkflowFactory::getCurrentInstant);
+    public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, 
Properties props) {
+      super(eventSubmitterContext, 
Workflow.newActivityStub(SubmitGTEActivity.class, 
ActivityType.SUBMIT_GTE.buildActivityOptions(props)), 
WithinWorkflowFactory::getCurrentInstant);
     }
 
     /**
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
index cdab706d86..d94bda0ca5 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java
@@ -69,10 +69,16 @@ public class ActivityTypeTest {
         {ActivityType.DELETE_WORK_DIRS, 222},
         {ActivityType.PROCESS_WORKUNIT, 555},
         {ActivityType.COMMIT, 444},
+        {ActivityType.SUBMIT_GTE, 999},
         {ActivityType.DEFAULT_ACTIVITY, 1}
     };
   }
 
+  @Test
+  public void 
testActivityTypesWithStartToCloseTimeoutDataProviderHasAllActivityTypes() {
+    Assert.assertEquals(activityTypesWithStartToCloseTimeout().length, 
activityTypes.size());
+  }
+
   @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
   public void testStartToCloseTimeout(ActivityType activityType, int 
expectedTimeout) {
     props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), 
Integer.toString(expectedTimeout));

Reply via email to