Will-Lo commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677025828



##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,20 @@ public void apply(JobListener jobListener, JobContext 
jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's 
copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, 
false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = 
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, 
totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + 
ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+                + "progress cannot be collected because " + 
DestinationDatasetHandlerService.class.getName()
+                + " does not support work unit streams. Turn off property " + 
ConfigurationKeys.REPORT_JOB_PROGRESS
+                + " and rerun job.");

Review comment:
       I think it's not necessarily true that it's due to 
`DestinationDatasetHandler` not supporting workunitstreams, but that this 
feature also does not support work unit streams (as total size is determined 
only once at the beginning). 

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -382,9 +384,17 @@ public Void call() {
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
           computeAndSetWorkUnitGuid(workUnit);
           addLineageInfo(copyEntity, workUnit);
-          if (copyEntity instanceof CopyableFile && 
DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
-            
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) 
copyEntity, workUnit, this.targetFs));
+          if (copyEntity instanceof CopyableFile) {
+            CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
+            fileSize = castedCopyEntity.getFileStatus().getLen();
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, fileSize);
+            if (DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
+              
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) 
copyEntity, workUnit, this.targetFs));
+            } else {
+              workUnitsForPartition.add(workUnit);
+            }
           } else {
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, 0);

Review comment:
       Can you leave a comment for when this condition would apply? If I recall 
correctly it's for some state files/post publish states

##########
File path: 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -382,9 +384,17 @@ public Void call() {
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
           computeAndSetWorkUnitGuid(workUnit);
           addLineageInfo(copyEntity, workUnit);
-          if (copyEntity instanceof CopyableFile && 
DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
-            
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) 
copyEntity, workUnit, this.targetFs));
+          if (copyEntity instanceof CopyableFile) {
+            CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
+            fileSize = castedCopyEntity.getFileStatus().getLen();
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, fileSize);
+            if (DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
+              
workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) 
copyEntity, workUnit, this.targetFs));
+            } else {
+              workUnitsForPartition.add(workUnit);
+            }
           } else {
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, 0);

Review comment:
       Can we leave a comment for when this case is true? From what I remember 
it had to do with local state files/ post publishing steps also existing in the 
directory.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext 
jobContext)
     }
   }
 
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {

Review comment:
       If you add the annotation the function should be protected/private, in 
this case I think it's good to make protected.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,20 @@ public void apply(JobListener jobListener, JobContext 
jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's 
copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, 
false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = 
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, 
totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + 
ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+                + "progress cannot be collected because " + 
DestinationDatasetHandlerService.class.getName()
+                + " does not support work unit streams. Turn off property " + 
ConfigurationKeys.REPORT_JOB_PROGRESS
+                + " and rerun job.");

Review comment:
       We can put a better exception message here to let the user know that 
copy progression doesn't work for infinite streams (which make sense, since how 
would we determine that the work is completed?)

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the 
progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = 
taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size 
property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+    // if progress reporting is enabled, value should be present
+    String strTotalSizeToCopy = 
this.jobState.getProp(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);

Review comment:
       You can use `PropertiesUtils.getPropAsLong` here (and for task byte 
size) instead of pulling the String. Instead of checking for null, check that 
the property exists in the state.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -105,6 +105,8 @@ protected JobStatus getJobStatus(State jobState) {
     int maxAttempts = 
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD,
 "1"));
     int currentAttempts = 
Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD,
 "1"));
     boolean shouldRetry = 
Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD,
 "false"));
+    int progressPercentage = 
Integer.parseInt(jobState.getProp(TimingEvent.JOB_COMPLETION_PERCENTAGE, "0"));
+    long lastProgressEventTime = 
Long.parseLong(jobState.getProp(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME, 
"0"));

Review comment:
       Similar comment here, you can use PropertiesUtils to parse ints/longs

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext 
jobContext)
     }
   }
 
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {

Review comment:
       If you mark @VisibleForTesting you should make the function 
private/protected (probably protected in this scenario)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to