umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678647641
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
this.eventBus.post(new
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
}
+ /**
+ * Uses the size of work units to determine a job's progress and reports the
progress as a percentage via
+ * GobblinTrackingEvents
+ * @param taskState of job launched
+ */
+ private void reportJobProgress(TaskState taskState) {
+ String stringSize =
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+ if (stringSize == null) {
+ LOGGER.warn("Expected to report job progress but work unit byte size
property null");
+ return;
+ }
+
+ Long taskByteSize = Long.parseLong(stringSize);
+
+ // if progress reporting is enabled, value should be present
+ if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+ LOGGER.warn("Expected to report job progress but total bytes to copy
property null");
+ return;
+ }
+ this.totalSizeToCopy =
this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+ // avoid flooding Kafka message queue by sending GobblinTrackingEvents
only when threshold is passed
+ this.bytesCopiedSoFar += taskByteSize;
+ Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+ if (newPercentageCopied - this.lastPercentageReported >
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+ this.lastPercentageReported = newPercentageCopied;
+ int percentageToReport = (int) Math.round(this.lastPercentageReported *
100);
+
+ Map<String, String> progress = new HashMap<>();
+ progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE,
String.valueOf(percentageToReport));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+ progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
Review comment:
Yes, I had noticed that other events do not need these three properties
because they are not processed by the `KafkaAvroStatusMonitor` (ie: in the case
of `WorkUnitsCreated`) or are explicitly adding them. From printing the
metadata it seems it only has the following and yet needs the flow level props:
`{jobName, jobId, azkabanJobId=autotest-hive-copy_hive-copy, azkabanFlowURL,
azkabanJobExecURL, azkabanJobURL, azkabanProjectName, azkabanFlowId,
azkabanExecId, azkabanURL, clusterIdentifier, metricContextID,
metricContextName}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]