phet commented on code in PR #3894:
URL: https://github.com/apache/gobblin/pull/3894#discussion_r1520485487


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java:
##########
@@ -154,6 +154,7 @@ public org.apache.gobblin.configuration.State 
parseJobStatus(GobblinTrackingEven
         break;
       case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
       case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
+      case TimingEvent.LauncherTimings.JOB_COMPLETE:

Review Comment:
   is the alt. to adding this in KafkaAvroJSM to instead have 
`ExecuteGobblinWorkflowImpl` emit `JOB_SUCCEEDED`?  if that's what 
gobblin-on-MR emitted, I'd prefer to fix the new producer rather than change 
the consumer side.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java:
##########
@@ -30,6 +31,8 @@
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;

Review Comment:
   aren't the hadoop imports to proceed gobblin ones?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java:
##########
@@ -84,13 +87,46 @@ public void submitJob(List<WorkUnit> workunits) {
       Config jobConfigWithOverrides = 
applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));
 
       Help.propagateGaaSFlowExecutionContext(this.jobProps);
-
-      EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext(this.eventSubmitter);
-
+      EventSubmitterContext eventSubmitterContext = new EventSubmitterContext(
+          addAdditionalMetadataTags(this.jobProps, 
this.eventSubmitter.getTags()),
+          eventSubmitter.getNamespace()
+      );
       int numWorkUnits = 
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
       log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
+
+  private static List<Tag<?>> addAdditionalMetadataTags(Properties jobProps, 
List<Tag<?>> additionalTags) {

Review Comment:
   if this be core gobblin-on-temporal stuff, how about a method of 
`EventSubmitterContext`?
   
   e.g. call as:
   ```
   EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext(this.eventSubmitter).withAugmentedMetadata(this.jobProps);
   ```
   (`withAugmentedMetadata` would return a new `EventSubmitterContext` that's 
updated to have the new `Tag`s).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to