This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be7c1a7f51 [GOBBLIN-2200] Onboard a new job executionId to deprecate 
Azkaban executionId (#4108)
be7c1a7f51 is described below

commit be7c1a7f51c096f7c35746a29987fd09581d6556
Author: pratapaditya04 <thesingh.adi.w...@gmail.com>
AuthorDate: Fri Apr 25 15:38:59 2025 +0530

    [GOBBLIN-2200] Onboard a new job executionId to deprecate Azkaban 
executionId (#4108)
    
    * created a new unique jobExecutionIdentifier to deprecate Azkaban 
executionId
---
 .../gobblin/configuration/ConfigurationKeys.java   |  6 ++++++
 .../apache/gobblin/azkaban/AzkabanJobLauncher.java |  9 ++++++--
 .../service/modules/spec/JobExecutionPlan.java     |  6 ++++++
 .../orchestration/proc/DagProcUtilsTest.java       | 24 ++++++++++++++++++++++
 .../gobblin/temporal/ddm/work/assistance/Help.java |  9 ++++----
 .../java/org/apache/gobblin/util/AzkabanTags.java  |  1 +
 6 files changed, 48 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6e07afac1c..1e9144640b 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1047,6 +1047,12 @@ public class ConfigurationKeys {
   public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid";
   public static final String AZKABAN_JOB_ID = "azkaban.job.id";
   public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid";
+  // Configuration Key for setting a unique job execution identifier in GaaS, 
the value is a UUID
+  public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid";
+
+  // Configuration Key for storing hash of gaas.job.execid, to be used as 
jobExecutionId(integer) for backwards compatibility
+  public static final String GAAS_JOB_EXEC_ID_HASH = 
"gaas.job.executionid.hash";
+
   public static final String AZKABAN_URL = "azkaban.link.execution.url";
   public static final String AZKABAN_FLOW_URL = "azkaban.link.workflow.url";
   public static final String AZKABAN_JOB_URL = "azkaban.link.job.url";
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 5da49e90bc..9ced348ac4 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Level;
@@ -385,7 +386,11 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
    */
   private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties 
jobProps) {
     List<Tag<?>> metadataTags = Lists.newArrayList();
-    String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, "");
+    String jobExecutionId = 
jobProps.getProperty(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, 
StringUtils.EMPTY);
+    //As a fallback setting the value of jobExecutionId to Azkaban Flow exec 
Id if GAAS_JOB_EXEC_ID is not set
+    if (Strings.isNullOrEmpty(jobExecutionId)) {
+      jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, 
StringUtils.EMPTY);
+    }
     // Display the proxy URL in the metadata tag if it exists
     String jobExecutionUrl = 
jobProps.getProperty(AZKABAN_LINK_JOBEXEC_PROXY_URL, 
jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, ""));
 
@@ -407,7 +412,7 @@ public class AzkabanJobLauncher extends AbstractJob 
implements ApplicationLaunch
     metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
         jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobExecutionId)));
 
-    //Use azkaban.flow.execid as the jobExecutionId
+    //Use gaas.job.execid.hash as the jobExecutionId
     metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
 
     metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index ea07cf435b..bbd728a04f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.spec;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -116,6 +117,9 @@ public class JobExecutionPlan {
 
       String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
       String edgeId = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
+      final String gaasJobExecutionId = UUID.randomUUID().toString(); // 
Creating a unique Identifier for JobExecution
+      final int gaasJobExecutionIdHash = gaasJobExecutionId.hashCode();  // 
Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for 
backward compatibility
+
       if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) || 
jobName.isEmpty()) {
         // Modify the job name to include the flow group, flow name, edge id, 
and a random string to avoid collisions since
         // job names are assumed to be unique within a dag.
@@ -152,6 +156,8 @@ public class JobExecutionPlan {
           .withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, 
ConfigValueFactory.fromAnyRef(flowFailureOption))
           .withValue(ConfigurationKeys.FLOW_EDGE_ID_KEY, 
ConfigValueFactory.fromAnyRef(edgeId))
           .withValue(FlowSpec.MODIFICATION_TIME_KEY, 
ConfigValueFactory.fromAnyRef(flowModTime))
+          .withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID, 
ConfigValueFactory.fromAnyRef(gaasJobExecutionId)) // Setting a unique 
Identifier for jobExecution
+          .withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, 
ConfigValueFactory.fromAnyRef(gaasJobExecutionIdHash))
       );
 
       //Add tracking config to JobSpec.
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
index 7b128f120c..ac9449f606 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
@@ -44,7 +44,10 @@ import 
org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.commons.lang.StringUtils;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -78,6 +81,27 @@ public class DagProcUtilsTest {
     Mockito.verifyNoMoreInteractions(dagManagementStateStore);
   }
 
+  @Test
+  public void testGaaSJobExecutionIdInjection() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
jobExecutionPlans.stream()
+        .map(Dag.DagNode<JobExecutionPlan>::new)
+        .collect(Collectors.toList());
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    // Assertion to test that GaaS job execution Id has been successfully 
injected
+    for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      final String gaasJobExecutionId = 
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(), 
ConfigurationKeys.GAAS_JOB_EXEC_ID, StringUtils.EMPTY);
+      final Long gaasJobExecutionIdHash = 
Long.parseLong(ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(), 
ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, StringUtils.EMPTY));
+
+      Assert.assertNotNull(gaasJobExecutionId);
+      Assert.assertEquals(gaasJobExecutionId.length(), 36);
+      Assert.assertNotNull(gaasJobExecutionIdHash);
+    }
+  }
+
   @Test
   public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, 
IOException {
     Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 2c3b34fce0..2a2cf17049 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -59,7 +59,6 @@ import org.apache.gobblin.util.SerializationUtils;
 public class Help {
   public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5;
   public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
-  public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
   public static final String USER_TO_PROXY_KEY = "user.to.proxy";
   public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
   public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
@@ -104,9 +103,9 @@ public class Help {
   public static String calcPerExecQualifier(Config workerConfig) {
     String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY)
         ? workerConfig.getString(USER_TO_PROXY_KEY) : "";
-    String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY)
-        ? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) : 
UUID.randomUUID().toString();
-    return userToProxy + "_" + azFlowExecId;
+    String gaasFlowExecId = 
workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID)
+        ? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) : 
UUID.randomUUID().toString();
+    return userToProxy + "_" + gaasFlowExecId;
   }
 
   public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
@@ -279,4 +278,4 @@ public class Help {
     troubleshooter.logIssueSummary();
     troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java
index e629c372de..4d7f4fad40 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java
@@ -38,6 +38,7 @@ public class AzkabanTags {
       .put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId")
       .put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId")
       .put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId")
+      .put(ConfigurationKeys.GAAS_JOB_EXEC_ID, "gaasJobExecId")
       .put(ConfigurationKeys.AZKABAN_URL, "azkabanURL")
       .put(ConfigurationKeys.AZKABAN_FLOW_URL, "azkabanFlowURL")
       .put(ConfigurationKeys.AZKABAN_JOB_URL, "azkabanJobURL")

Reply via email to