abhishekmjain commented on code in PR #4108:
URL: https://github.com/apache/gobblin/pull/4108#discussion_r2055251377


##########
gobblin-utility/src/main/java/org/apache/gobblin/util/AzkabanTags.java:
##########
@@ -38,6 +38,7 @@ public class AzkabanTags {
       .put(ConfigurationKeys.AZKABAN_FLOW_ID, "azkabanFlowId")
       .put(ConfigurationKeys.AZKABAN_JOB_ID, "azkabanJobId")
       .put(ConfigurationKeys.AZKABAN_EXEC_ID, "azkabanExecId")
+      .put(ConfigurationKeys.GAAS_JOB_EXEC_ID,"gaasJobExecId")

Review Comment:
   nit: let's put a space after ,



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws 
URISyntaxException, IOException
     Mockito.verifyNoMoreInteractions(dagManagementStateStore);
   }
 
+  @Test
+  public void testGaaSJobExecutionIdInjection() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
jobExecutionPlans.stream()
+        .map(Dag.DagNode<JobExecutionPlan>::new)
+        .collect(Collectors.toList());
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    // Assertion to test that GaaS job execution Id has been successfully 
injected
+    for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      final String gaasJobExecutionId = 
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,"");
+      final Long gaasJobExecutionIdHash = 
Long.parseLong(ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID_HASH,""));

Review Comment:
   nit: space after comma



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1047,6 +1047,12 @@ public class ConfigurationKeys {
   public static final String AZKABAN_FLOW_ID = "azkaban.flow.flowid";
   public static final String AZKABAN_JOB_ID = "azkaban.job.id";
   public static final String AZKABAN_EXEC_ID = "azkaban.flow.execid";
+  // Configuration Key for setting a unique job execution identifier in GaaS, 
the value is a UUID
+  public static final String GAAS_JOB_EXEC_ID = "gaas.job.execid";
+
+  // Configuration Key for storing hash of gaas.job.execid, to be used as 
jobExecutionId(integer) for backwards compatibility
+  public static final String GAAS_JOB_EXEC_ID_HASH = 
"gaas.job.executionid.hash";

Review Comment:
   why does the rename work?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java:
##########
@@ -78,6 +80,27 @@ public void testSubmitNextNodesSuccess() throws 
URISyntaxException, IOException
     Mockito.verifyNoMoreInteractions(dagManagementStateStore);
   }
 
+  @Test
+  public void testGaaSJobExecutionIdInjection() throws URISyntaxException, 
IOException {
+    Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
+    List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = 
jobExecutionPlans.stream()
+        .map(Dag.DagNode<JobExecutionPlan>::new)
+        .collect(Collectors.toList());
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(),
 Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
+    DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
+    // Assertion to test that GaaS job execution Id has been successfully 
injected
+    for(JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      final String gaasJobExecutionId = 
ConfigUtils.getString(jobExecutionPlan.getJobSpec().getConfig(),ConfigurationKeys.GAAS_JOB_EXEC_ID,"");

Review Comment:
   nit: can we use `StringUtils.EMPTY` instead of "" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to