arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679507183



##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the 
progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = 
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size 
property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy 
property null");
+      return;
+    }
+    this.totalSizeToCopy = 
this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents 
only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > 
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 
100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, 
String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       @umustafi these params are added in IvyJobLauncher. Let's remove them 
from here and if it does not work, we will debug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to