[ 
https://issues.apache.org/jira/browse/GOBBLIN-1493?focusedWorklogId=626948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-626948
 ]

ASF GitHub Bot logged work on GOBBLIN-1493:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jul/21 00:20
            Start Date: 23/Jul/21 00:20
    Worklog Time Spent: 10m 
      Work Description: aplex commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r675209138



##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
    * number of records processed in the last job execution
    */
   processedCount: long
+
+  /**
+   * data copy progress as a percentage
+   */
+  copyProgress: long
+
+  /**
+   * estimate of time left until copy completion
+   */
+  estimatedTimeLeft: long

Review comment:
       What is the unit of time here? Could be worth adding the unit into the 
name. e.g. estimatedSecondsToCompletion, estimatedTimeLeftSeconds or something 
similar.

##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +204,17 @@ public static FlowExecution 
convertFlowStatus(org.apache.gobblin.service.monitor
   private static long 
getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following 
formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed
+   */
+  public static long estimateCopyTimeLeft(Long currentTime, Long startTime, 
Long completionPercentage) {

Review comment:
       Inside the code, I suggest to use Duration/Interval classes to represent 
time periods. That will remove the questions of time units. Restli  does not 
have a built-in type for time, so we had to covert it to long or string.

##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
    * number of records processed in the last job execution
    */
   processedCount: long
+
+  /**
+   * data copy progress as a percentage
+   */
+  copyProgress: long

Review comment:
       I think this is not just for copy. Ingestion job too can have a 
progress. For example, we were chatting about having mysql ingestion through 
GaaS. 

##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FlowExecutionResourceLocalHandlerTest {
+
+  @Test
+  public void testEstimateCopyTimeLeftSimple() throws Exception {
+    long currentTime = 50;
+    long startTime = 20;
+    long copyPercentage = 10;
+
+    long timeLeft = 
FlowExecutionResourceLocalHandler.estimateCopyTimeLeft(currentTime, startTime, 
copyPercentage);
+    Assert.assertEquals(timeLeft,300);

Review comment:
       There is a missing space after coma. For new files, you can basically 
run "silent code cleanup" command in IntelliJ. It will correct all code style 
issues in the file. You can bind it to a shortcut for more convenience. I 
usually don't pay attention to  setting those spaces and new lines, and just 
reformat the file all the time.
   
   For existing file, you can select a code block that you've changed and 
reformat it automatically in the same way. As you'll find out, some old files 
have existing code style issues, and if the whole file is reformatted, a lot of 
unrelated lines will get included in PR.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext 
jobContext)
     }
   }
 
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {

Review comment:
       Looks like this method is public only to allow testing of it. You can 
put @VisibleForTesting annotation to signify that. Otherwise folks will suggest 
you to make it private.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +468,17 @@ public void apply(JobListener jobListener, JobContext 
jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's 
copy progress
+        if 
(jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_COPY_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = 
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, 
totalSizeInBytes);
+          } else {
+            throw new 
RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not 
support work unit streams");

Review comment:
       You can add more context here, so when people see it in logs they would 
know what to do. For example, you can say that a specific property is turned 
on, but progress cannot be collected because ... Then you can suggest people to 
either turn the property X off, or use a different Z to get this thing working.
   
   Detailed error messages will reduce the support and oncall volume, and 
create overall better experience for users and SREs. I wrote more about this in 
the doc - 
https://docs.google.com/document/d/154girrJkI_hFNAKiHEyT3T4RtHclxqO15uzaXga74Uw/edit#heading=h.cfg1x3y98n7i

##########
File path: 
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
    * number of records processed in the last job execution
    */
   processedCount: long
+
+  /**
+   * data copy progress as a percentage
+   */
+  copyProgress: long

Review comment:
       I assume this will go from 0 to 100, so we can use int here. You can 
also add the possible range to the comment above for clarity.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
 
     LOGGER.info(String.format("Collected task state of %d completed tasks", 
taskStateQueue.size()));
 
+    Long taskByteSize;
+    String stringSize;
+    Double newPercentageCopied;
     // Add the TaskStates of completed tasks to the JobState so when the 
control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
     for (TaskState taskState : taskStateQueue) {
       consumeTaskIssues(taskState);
       taskState.setJobState(this.jobState);
       this.jobState.addTaskState(taskState);
+
+      if 
(jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_COPY_PROGRESS, false)) {
+        stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+        if (stringSize == null) {
+          LOGGER.warn("Expected to report data copy progress but work unit 
byte size property null");
+          break;
+        }
+
+        taskByteSize = Long.parseLong(stringSize);
+        // if progress reporting is enabled, value should be present
+        String strTotalSizeToCopy = 
this.jobState.getProp(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+        if (strTotalSizeToCopy == null) {
+          LOGGER.warn("Expected to report data copy progress but total bytes 
to copy property null");
+          break;
+        }
+
+        this.totalSizeToCopy = Long.parseLong(strTotalSizeToCopy);
+        // avoid flooding Kafka message queue by sending GobblinTrackingEvents 
only when threshold is passed
+        this.bytesCopiedSoFar += taskByteSize;
+        newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+        if (newPercentageCopied - this.lastPercentageReported > 
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+          this.lastPercentageReported = newPercentageCopied;
+          Long percentageToReport = (long) (this.lastPercentageReported * 100);

Review comment:
       Should we do a Math.Round() here? I think by default the value will be 
truncated when you convert double to long. so 99.999 will be 99 , not 100. And 
we can end up with jobs that are fully complete, but still show 99%  progress.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
 
     LOGGER.info(String.format("Collected task state of %d completed tasks", 
taskStateQueue.size()));
 
+    Long taskByteSize;
+    String stringSize;
+    Double newPercentageCopied;
     // Add the TaskStates of completed tasks to the JobState so when the 
control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
     for (TaskState taskState : taskStateQueue) {
       consumeTaskIssues(taskState);
       taskState.setJobState(this.jobState);
       this.jobState.addTaskState(taskState);
+

Review comment:
       This method is already quite complicated. Generally, we should try to 
have no more a screens of code in each method, to make it easier to understand 
and read. You can try moving the logic to a separate method or class, whatever 
makes more sense.
   
   Fields like "lastPercentageReported" can be more meaningful in separate 
class, rather than in task state collector. That will better match single 
responsibility principle - https://en.wikipedia.org/wiki/SOLID
   

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
##########
@@ -163,6 +163,9 @@ public GobblinTrackingEvent 
deserializeEvent(DecodeableKafkaRecord<byte[],byte[]
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.CANCELLED.name());
         properties.put(TimingEvent.JOB_END_TIME, 
properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.JOB_COMPLETION_PERCENTAGE:
+        properties.put(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME, 
properties.getProperty(TimingEvent.METADATA_END_TIME));

Review comment:
       Is METADATA_END_TIME the correct value here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 626948)
    Time Spent: 1h  (was: 50m)

> Data Copy Progress Reporting 
> -----------------------------
>
>                 Key: GOBBLIN-1493
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1493
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-core, gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Progress reporting for a data copy will provide users with quantitative 
> feedback on the progress of a data copy job as a percentage as well as an 
> estimate of the time remaining for completion. This will update the existing 
> job status endpoint to include the progress percentage and estimate of time 
> left. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to