This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c6c24fe47 [GOBBLIN-2207] Make temporal unique classifier GaaS attempt 
aware (#4116)
1c6c24fe47 is described below

commit 1c6c24fe47c1e4df3bfbdd60c161751248c9702c
Author: pratapaditya04 <[email protected]>
AuthorDate: Mon May 12 12:08:11 2025 +0530

    [GOBBLIN-2207] Make temporal unique classifier GaaS attempt aware (#4116)
    
    * made temporal unique classifier gaas attempt aware
---
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java     | 2 +-
 .../src/main/java/org/apache/gobblin/cluster/HelixUtils.java         | 4 ++--
 .../src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java | 4 ++--
 .../org/apache/gobblin/service/modules/orchestration/DagUtils.java   | 2 +-
 .../java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java   | 5 ++++-
 .../gobblin/temporal/workflows/metrics/EventSubmitterContext.java    | 4 ++--
 6 files changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 1e9144640b..03f1a2629e 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -191,7 +191,7 @@ public class ConfigurationKeys {
   public static final String JOB_GROUP_KEY = "job.group";
   public static final String JOB_TAG_KEY = "job.tag";
   public static final String JOB_DESCRIPTION_KEY = "job.description";
-  public static final String JOB_CURRENT_ATTEMPTS = "job.currentAttempts";
+  public static final String JOB_ATTEMPT_ID = "job.attemptId";
   public static final String JOB_CURRENT_GENERATION = "job.currentGeneration";
   // Job launcher type
   public static final String JOB_LAUNCHER_TYPE_KEY = "launcher.type";
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 45c3685d17..cd0d2d0e7c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -183,9 +183,9 @@ public class HelixUtils {
           jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobExecutionId)));
     }
 
-    if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+    if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
+          jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
           jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, 
"1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 9ced348ac4..e1e3dcfbc7 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -399,9 +399,9 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
     metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
         jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
 
-    if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+    if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
-          jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, "1")));
+          jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
           jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, 
"1")));
       metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
index a2f5e1062d..a5a0850217 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagUtils.java
@@ -128,7 +128,7 @@ public class DagUtils {
 
   public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) {
     JobSpec jobSpec = dagNode.getValue().getJobSpec();
-    Map<String, String> configWithCurrentAttempts = 
ImmutableMap.of(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, 
String.valueOf(dagNode.getValue().getCurrentAttempts()),
+    Map<String, String> configWithCurrentAttempts = 
ImmutableMap.of(ConfigurationKeys.JOB_ATTEMPT_ID, 
String.valueOf(dagNode.getValue().getCurrentAttempts()),
         ConfigurationKeys.JOB_CURRENT_GENERATION, 
String.valueOf(dagNode.getValue().getCurrentGeneration()));
     Properties configAsProperties = (Properties) 
jobSpec.getConfigAsProperties().clone();
     configAsProperties.putAll(configWithCurrentAttempts);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 2a2cf17049..8ee9a69091 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -62,6 +62,7 @@ public class Help {
   public static final String USER_TO_PROXY_KEY = "user.to.proxy";
   public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
   public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
+  public static final String DEFAULT_GAAS_ATTEMPT_ID = "1";
 
   // treat `JobState` as immutable and cache, for reuse among activities 
executed by the same worker
   private static final transient Cache<Path, JobState> jobStateByPath = 
CacheBuilder.newBuilder().recordStats().build();
@@ -105,7 +106,9 @@ public class Help {
         ? workerConfig.getString(USER_TO_PROXY_KEY) : "";
     String gaasFlowExecId = 
workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID)
         ? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) : 
UUID.randomUUID().toString();
-    return userToProxy + "_" + gaasFlowExecId;
+    String gaasAttemptId = 
workerConfig.hasPath(ConfigurationKeys.JOB_ATTEMPT_ID)
+        ? workerConfig.getString(ConfigurationKeys.JOB_ATTEMPT_ID) : 
DEFAULT_GAAS_ATTEMPT_ID;
+    return String.join("_", userToProxy, gaasFlowExecId, gaasAttemptId);
   }
 
   public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
index 6ef80f00d8..e23b5dab7e 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventSubmitterContext.java
@@ -109,9 +109,9 @@ public class EventSubmitterContext {
         this.tags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
       }
 
-      if (jobProps.containsKey(ConfigurationKeys.JOB_CURRENT_ATTEMPTS)) {
+      if (jobProps.containsKey(ConfigurationKeys.JOB_ATTEMPT_ID)) {
         this.tags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
-            jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_ATTEMPTS, 
"1")));
+            jobProps.getProperty(ConfigurationKeys.JOB_ATTEMPT_ID, "1")));
         this.tags.add(new 
Tag<>(TimingEvent.FlowEventConstants.CURRENT_GENERATION_FIELD,
             jobProps.getProperty(ConfigurationKeys.JOB_CURRENT_GENERATION, 
"1")));
         this.tags.add(new 
Tag<>(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,

Reply via email to