[ 
https://issues.apache.org/jira/browse/GOBBLIN-1493?focusedWorklogId=630786&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-630786
 ]

ASF GitHub Bot logged work on GOBBLIN-1493:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jul/21 21:07
            Start Date: 28/Jul/21 21:07
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678647641



##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the 
progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = 
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size 
property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy 
property null");
+      return;
+    }
+    this.totalSizeToCopy = 
this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents 
only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > 
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 
100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, 
String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       Yes, I had noticed that other events do not need these three properties 
because they are not processed by the `KafkaAvroStatusMonitor` (ie: in the case 
of `WorkUnitsCreated`) or are explicitly adding them. From printing the 
metadata it seems it only has the following and yet needs the flow level props:
   `{jobName, jobId, azkabanJobId, azkabanFlowURL, azkabanJobExecURL, 
azkabanJobURL, azkabanProjectName, azkabanFlowId, azkabanExecId, azkabanURL, 
clusterIdentifier, metricContextID, metricContextName}`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 630786)
    Time Spent: 4.5h  (was: 4h 20m)

> Data Copy Progress Reporting 
> -----------------------------
>
>                 Key: GOBBLIN-1493
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1493
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core, gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Progress reporting for a data copy will provide users with quantitative 
> feedback on the progress of a data copy job as a percentage as well as an 
> estimate of the time remaining for completion. This will update the existing 
> job status endpoint to include the progress percentage and estimate of time 
> left. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to