This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be7c1a7f51 [GOBBLIN-2200] Onboard a new job executionId to deprecate
Azkaban executionId (#4108)
be7c1a7f51 is described below
commit be7c1a7f51c096f7c35746a29987fd09581d6556
Author: pratapaditya04 <[email protected]>
AuthorDate: Fri Apr 25 15:38:59 2025 +0530
[GOBBLIN-2200] Onboard a new job executionId to deprecate Azkaban
executionId (#4108)
* created a new unique jobExecutionIdentifier to deprecate Azkaban
executionId
---
.../gobblin/configuration/ConfigurationKeys.java | 6 ++++++
.../apache/gobblin/azkaban/AzkabanJobLauncher.java | 9 ++++++--
.../service/modules/spec/JobExecutionPlan.java | 6 ++++++
.../orchestration/proc/DagProcUtilsTest.java | 24 ++++++++++++++++++++++
.../gobblin/temporal/ddm/work/assistance/Help.java | 9 ++++----
.../java/org/apache/gobblin/util/AzkabanTags.java | 1 +
6 files changed, 48 insertions(+), 7 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6e07afac1c..1e9144640b 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1047,6 +1047,12 @@ public class ConfigurationKeys {
public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid";
public static final String AZKABAN_JOB_ID = "azkaban.job.id";
public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid";
+ // Configuration Key for setting a unique job execution identifier in GaaS,
the value is a UUID
+ public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid";
+
+ // Configuration Key for storing hash of gaas.job.execid, to be used as
jobExecutionId(integer) for backwards compatibility
+ public static final String GAAS_JOB_EXEC_ID_HASH =
"gaas.job.executionid.hash";
+
public static final String AZKABAN_URL = "azkaban.link.execution.url";
public static final String AZKABAN_FLOW_URL = "azkaban.link.workflow.url";
public static final String AZKABAN_JOB_URL = "azkaban.link.job.url";
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index 5da49e90bc..9ced348ac4 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Level;
@@ -385,7 +386,11 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
*/
private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties
jobProps) {
List<Tag<?>> metadataTags = Lists.newArrayList();
- String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, "");
+ String jobExecutionId =
jobProps.getProperty(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH,
StringUtils.EMPTY);
+ //As a fallback setting the value of jobExecutionId to Azkaban Flow exec
Id if GAAS_JOB_EXEC_ID is not set
+ if (Strings.isNullOrEmpty(jobExecutionId)) {
+ jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID,
StringUtils.EMPTY);
+ }
// Display the proxy URL in the metadata tag if it exists
String jobExecutionUrl =
jobProps.getProperty(AZKABAN_LINK_JOBEXEC_PROXY_URL,
jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, ""));
@@ -407,7 +412,7 @@ public class AzkabanJobLauncher extends AbstractJob
implements ApplicationLaunch
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobExecutionId)));
- //Use azkaban.flow.execid as the jobExecutionId
+ //Use gaas.job.execid.hash as the jobExecutionId
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index ea07cf435b..bbd728a04f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.spec;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -116,6 +117,9 @@ public class JobExecutionPlan {
String jobName = ConfigUtils.getString(jobConfig,
ConfigurationKeys.JOB_NAME_KEY, "");
String edgeId = ConfigUtils.getString(jobConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
+ final String gaasJobExecutionId = UUID.randomUUID().toString(); //
Creating a unique Identifier for JobExecution
+ final int gaasJobExecutionIdHash = gaasJobExecutionId.hashCode(); //
Passing the hashCode of the uniqueIdentifier to be used as jobExecutionId for
backward compatibility
+
if (!ConfigUtils.getBoolean(jobConfig, JOB_MAINTAIN_JOBNAME, false) ||
jobName.isEmpty()) {
// Modify the job name to include the flow group, flow name, edge id,
and a random string to avoid collisions since
// job names are assumed to be unique within a dag.
@@ -152,6 +156,8 @@ public class JobExecutionPlan {
.withValue(ConfigurationKeys.FLOW_FAILURE_OPTION,
ConfigValueFactory.fromAnyRef(flowFailureOption))
.withValue(ConfigurationKeys.FLOW_EDGE_ID_KEY,
ConfigValueFactory.fromAnyRef(edgeId))
.withValue(FlowSpec.MODIFICATION_TIME_KEY,
ConfigValueFactory.fromAnyRef(flowModTime))
+ .withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID,
ConfigValueFactory.fromAnyRef(gaasJobExecutionId)) // Setting a unique
Identifier for jobExecution
+ .withValue(ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH,
ConfigValueFactory.fromAnyRef(gaasJobExecutionIdHash))
);
//Add tracking config to JobSpec.
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
index 7b128f120c..ac9449f606 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java
@@ -44,7 +44,10 @@ import
org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.commons.lang.StringUtils;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -78,6 +81,27 @@ public class DagProcUtilsTest {
Mockito.verifyNoMoreInteractions(dagManagementStateStore);
}
+ @Test
+ public void testGaaSJobExecutionIdInjection() throws URISyntaxException,
IOException {
+ Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+ List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+ List<Dag.DagNode<JobExecutionPlan>> dagNodeList =
jobExecutionPlans.stream()
+ .map(Dag.DagNode<JobExecutionPlan>::new)
+ .collect(Collectors.toList());
+ Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+ // Assertion to test that GaaS job execution Id has been successfully
injected
+ for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+ final String gaasJobExecutionId =
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),
ConfigurationKeys.GAAS_JOB_EXEC_ID, StringUtils.EMPTY);
+ final Long gaasJobExecutionIdHash =
Long.parseLong(ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),
ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH, StringUtils.EMPTY));
+
+ Assert.assertNotNull(gaasJobExecutionId);
+ Assert.assertEquals(gaasJobExecutionId.length(), 36);
+ Assert.assertNotNull(gaasJobExecutionIdHash);
+ }
+ }
+
@Test
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException,
IOException {
Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 2c3b34fce0..2a2cf17049 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -59,7 +59,6 @@ import org.apache.gobblin.util.SerializationUtils;
public class Help {
public static final int MAX_DESERIALIZATION_FS_LOAD_ATTEMPTS = 5;
public static final int LOG_CACHE_STATS_EVERY_N_ACCESSES = 1000;
- public static final String AZKABAN_FLOW_EXEC_ID_KEY = "azkaban.flow.execid";
public static final String USER_TO_PROXY_KEY = "user.to.proxy";
public static final String USER_TO_PROXY_SEARCH_KEY = "userToProxy";
public static final String GAAS_FLOW_ID_SEARCH_KEY = "gaasFlowIdSearchKey";
@@ -104,9 +103,9 @@ public class Help {
public static String calcPerExecQualifier(Config workerConfig) {
String userToProxy = workerConfig.hasPath(USER_TO_PROXY_KEY)
? workerConfig.getString(USER_TO_PROXY_KEY) : "";
- String azFlowExecId = workerConfig.hasPath(AZKABAN_FLOW_EXEC_ID_KEY)
- ? workerConfig.getString(AZKABAN_FLOW_EXEC_ID_KEY) :
UUID.randomUUID().toString();
- return userToProxy + "_" + azFlowExecId;
+ String gaasFlowExecId =
workerConfig.hasPath(ConfigurationKeys.GAAS_JOB_EXEC_ID)
+ ? workerConfig.getString(ConfigurationKeys.GAAS_JOB_EXEC_ID) :
UUID.randomUUID().toString();
+ return userToProxy + "_" + gaasFlowExecId;
}
public static FileSystem loadFileSystem(FileSystemApt a) throws IOException {
@@ -279,4 +278,4 @@ public class Help {
troubleshooter.logIssueSummary();
troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java
index e629c372de..4d7f4fad40 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java
@@ -38,6 +38,7 @@ public class AzkabanTags {
.put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId")
.put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId")
.put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId")
+ .put(ConfigurationKeys.GAAS_JOB_EXEC_ID, "gaasJobExecId")
.put(ConfigurationKeys.AZKABAN_URL, "azkabanURL")
.put(ConfigurationKeys.AZKABAN_FLOW_URL, "azkabanFlowURL")
.put(ConfigurationKeys.AZKABAN_JOB_URL, "azkabanJobURL")