[ 
https://issues.apache.org/jira/browse/GOBBLIN-2200?focusedWorklogId=967170&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-967170
 ]

ASF GitHub Bot logged work on GOBBLIN-2200:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/25 05:05
            Start Date: 23/Apr/25 05:05
    Worklog Time Spent: 10m 
      Work Description: abhishekmjain commented on code in PR #4108:
URL: https://github.com/apache/gobblin/pull/4108#discussion_r2055251377


##########
gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java:
##########
@@ -38,6 +38,7 @@ public class AzkabanTags {
       .put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId")
       .put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId")
       .put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId")
+      .put(ConfigurationKeys.GAAS_JOB_EXEC_ID,"gaasJobExecId")

Review Comment:
   nit: let's put a space after ,



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws 
URISyntaxException, IOException
     Mockito.verifyNoMoreInteractions(dagManagementStateStore);
   }
 
+  @Test
+  public void testGaaSJobExecutionIdInjection() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
jobExecutionPlans.stream()
+        .map(Dag.DagNode<JobExecutionPlan>::new)
+        .collect(Collectors.toList());
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    // Assertion to test that GaaS job execution Id has been successfully 
injected
+    for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      final String gaasJobExecutionId = 
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,"");
+      final Long gaasJobExecutionIdHash = 
Long.parseLong(ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH,""));

Review Comment:
   nit: space after comma



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1047,6 +1047,12 @@ public class ConfigurationKeys {
   public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid";
   public static final String AZKABAN_JOB_ID = "azkaban.job.id";
   public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid";
+  // Configuration Key for setting a unique job execution identifier in GaaS, 
the value is a UUID
+  public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid";
+
+  // Configuration Key for storing hash of gaas.job.execid, to be used as 
jobExecutionId(integer) for backwards compatibility
+  public static final String GAAS_JOB_EXEC_ID_HASH = 
"gaas.job.executionid.hash";

Review Comment:
   why does the rename work?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws 
URISyntaxException, IOException
     Mockito.verifyNoMoreInteractions(dagManagementStateStore);
   }
 
+  @Test
+  public void testGaaSJobExecutionIdInjection() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
jobExecutionPlans.stream()
+        .map(Dag.DagNode<JobExecutionPlan>::new)
+        .collect(Collectors.toList());
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    // Assertion to test that GaaS job execution Id has been successfully 
injected
+    for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      final String gaasJobExecutionId = 
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,"");

Review Comment:
   nit: can we use `StringUtils.EMPTY` instead of "" ?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 967170)
    Time Spent: 1h 20m  (was: 1h 10m)

> Moving Away From Azkaban Execution Id
> -------------------------------------
>
>                 Key: GOBBLIN-2200
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2200
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Aditya Pratap Singh
>            Priority: Major
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Moving Away From Azkaban Execution Id
> In several places we use azkaban.flow.exec.id config values to identify the 
> jobExecution, since we don't want to tie Gobblin to Azkaban, introducing a 
> new fieldĀ 
> gaas.job.execid which will serve as the configuration Key for a unique job 
> execution identifier in GaaS, the value is a UUID



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to