umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679561366
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Uses the size of work units to determine a job's progress and reports the
progress as a percentage via
+ * GobblinTrackingEvents
+ * @param taskState of job launched
+ */
+ private void reportJobProgress(TaskState taskState) {
+ String stringSize =
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+ if (stringSize == null) {
+ LOGGER.warn("Expected to report job progress but work unit byte size
property null");
+ return;
+ }
+
+ Long taskByteSize = Long.parseLong(stringSize);
+
+ // if progress reporting is enabled, value should be present
+ if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+ LOGGER.warn("Expected to report job progress but total bytes to copy
property null");
+ return;
+ }
+ this.totalSizeToCopy =
this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+ // avoid flooding Kafka message queue by sending GobblinTrackingEvents
only when threshold is passed
+ this.bytesCopiedSoFar += taskByteSize;
+ Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+ if (newPercentageCopied - this.lastPercentageReported >
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+ this.lastPercentageReported = newPercentageCopied;
+ int percentageToReport = (int) Math.round(this.lastPercentageReported *
100);
+
+ Map<String, String> progress = new HashMap<>();
+ progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE,
String.valueOf(percentageToReport));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
Review comment:
removed them because they should be present in the metric context, from
a previous check i know they are not in jobState
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]