This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 982a2dc  [GOBBLIN-1543] Set default for work unit size (#3394)
982a2dc is described below

commit 982a2dcf418f948ce1ee2ec7691c5b4fe40f8e33
Author: umustafi <[email protected]>
AuthorDate: Thu Sep 16 09:44:40 2021 -0700

    [GOBBLIN-1543] Set default for work unit size (#3394)
    
    User encountered exception when summing work units for retention jobs that 
do not have size associated with it, so I am defaulting to progress reporting 
in terms of work units for cases that do not have size associated with it.
---
 .../gobblin/runtime/AbstractJobLauncher.java       |  2 +-
 .../gobblin/runtime/TaskStateCollectorService.java | 26 ++++++++++++++++++----
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 562d688..684a00a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -663,7 +663,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   @VisibleForTesting
   public static long sumWorkUnitsSizes (WorkUnitStream workUnitStream) {
     Collection<WorkUnit> workUnits = 
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
-    long totalSizeInBytes = workUnits.stream().mapToLong(wu -> 
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE)).sum();
+    long totalSizeInBytes = workUnits.stream().mapToLong(wu -> 
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).sum();
     return totalSizeInBytes;
   }
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 9377727..4316463 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -90,6 +90,10 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
 
   private double bytesCopiedSoFar;
 
+  private double totalNumWorkUnits;
+
+  private double workUnitsCompletedSoFar;
+
   private double lastPercentageReported;
 
   /**
@@ -269,17 +273,31 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
 
     Long taskByteSize = Long.parseLong(stringSize);
 
-    // if progress reporting is enabled, value should be present
+    // If progress reporting is enabled, value should be present
     if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
       LOGGER.warn("Expected to report job progress but total bytes to copy 
property null");
       return;
     }
     this.totalSizeToCopy = 
this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
 
-    // avoid flooding Kafka message queue by sending GobblinTrackingEvents 
only when threshold is passed
-    this.bytesCopiedSoFar += taskByteSize;
-    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+    // If total size in bytes cannot be calculated, then default to progress 
reporting in terms of workunits
+    Double newPercentageCopied;
+    if (this.totalSizeToCopy == 0) {
+      this.totalNumWorkUnits = 
this.jobState.getPropAsLong(AbstractJobLauncher.NUM_WORKUNITS);
+      this.workUnitsCompletedSoFar += 1;
+
+      if (this.totalNumWorkUnits == 0) {
+        LOGGER.warn("Expected to report job progress but work units are not 
countable");
+        return;
+      }
+      newPercentageCopied = this.workUnitsCompletedSoFar / 
this.totalNumWorkUnits;
+    } else {
+      this.bytesCopiedSoFar += taskByteSize;
+      newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+    }
+
 
+    // Avoid flooding Kafka message queue by sending GobblinTrackingEvents 
only when threshold is passed
     // Report progress when it reaches 100% regardless of difference from 
lastPercentageReported
     if (newPercentageCopied - this.lastPercentageReported >= 
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD ||
         (Math.abs(newPercentageCopied - 1.0) < 0.001)) {

Reply via email to