[ https://issues.apache.org/jira/browse/GOBBLIN-2200?focusedWorklogId=967170&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-967170 ]
ASF GitHub Bot logged work on GOBBLIN-2200: ------------------------------------------- Author: ASF GitHub Bot Created on: 23/Apr/25 05:05 Start Date: 23/Apr/25 05:05 Worklog Time Spent: 10m Work Description: abhishekmjain commented on code in PR #4108: URL: https://github.com/apache/gobblin/pull/4108#discussion_r2055251377 ########## gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java: ########## @@ -38,6 +38,7 @@ public class AzkabanTags { .put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId") .put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId") .put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId") + .put(ConfigurationKeys.GAAS_JOB_EXEC_ID,"gaasJobExecId") Review Comment: nit: let's put a space after , ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException Mockito.verifyNoMoreInteractions(dagManagementStateStore); } + @Test + public void testGaaSJobExecutionIdInjection() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream() + .map(Dag.DagNode<JobExecutionPlan>::new) + .collect(Collectors.toList()); + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + // Assertion to test that GaaS job execution Id has been successfully injected + for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) { + final String gaasJobExecutionId = ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,""); + final Long gaasJobExecutionIdHash = Long.parseLong(ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH,"")); Review Comment: nit: space after comma ########## gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java: ########## @@ -1047,6 +1047,12 @@ public class ConfigurationKeys { public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid"; public static final String AZKABAN_JOB_ID = "azkaban.job.id"; public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid"; + // Configuration Key for setting a unique job execution identifier in GaaS, the value is a UUID + public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid"; + + // Configuration Key for storing hash of gaas.job.execid, to be used as jobExecutionId(integer) for backwards compatibility + public static final String GAAS_JOB_EXEC_ID_HASH = "gaas.job.executionid.hash"; Review Comment: why does the rename work? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException Mockito.verifyNoMoreInteractions(dagManagementStateStore); } + @Test + public void testGaaSJobExecutionIdInjection() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream() + .map(Dag.DagNode<JobExecutionPlan>::new) + .collect(Collectors.toList()); + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + // Assertion to test that GaaS job execution Id has been successfully injected + for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) { + final String gaasJobExecutionId = ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,""); Review Comment: nit: can we use `StringUtils.EMPTY` instead of "" ? Issue Time Tracking ------------------- Worklog Id: (was: 967170) Time Spent: 1h 20m (was: 1h 10m) > Moving Away From Azkaban Execution Id > ------------------------------------- > > Key: GOBBLIN-2200 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2200 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Aditya Pratap Singh > Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Moving Away From Azkaban Execution Id > In several places we use azkaban.flow.exec.id config values to identify the > jobExecution, since we don't want to tie Gobblin to Azkaban, introducing a > new fieldĀ > gaas.job.execid which will serve as the configuration Key for a unique job > execution identifier in GaaS, the value is a UUID -- This message was sent by Atlassian Jira (v8.20.10#820010)