This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 982a2dc [GOBBLIN-1543] Set default for work unit size (#3394)
982a2dc is described below
commit 982a2dcf418f948ce1ee2ec7691c5b4fe40f8e33
Author: umustafi <[email protected]>
AuthorDate: Thu Sep 16 09:44:40 2021 -0700
[GOBBLIN-1543] Set default for work unit size (#3394)
User encountered exception when summing work units for retention jobs that
do not have size associated with it, so I am defaulting to progress reporting
in terms of work units for cases that do not have size associated with it.
---
.../gobblin/runtime/AbstractJobLauncher.java | 2 +-
.../gobblin/runtime/TaskStateCollectorService.java | 26 ++++++++++++++++++----
2 files changed, 23 insertions(+), 5 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 562d688..684a00a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -663,7 +663,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
@VisibleForTesting
public static long sumWorkUnitsSizes (WorkUnitStream workUnitStream) {
Collection<WorkUnit> workUnits =
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
- long totalSizeInBytes = workUnits.stream().mapToLong(wu ->
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE)).sum();
+ long totalSizeInBytes = workUnits.stream().mapToLong(wu ->
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).sum();
return totalSizeInBytes;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 9377727..4316463 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -90,6 +90,10 @@ public class TaskStateCollectorService extends
AbstractScheduledService {
private double bytesCopiedSoFar;
+ private double totalNumWorkUnits;
+
+ private double workUnitsCompletedSoFar;
+
private double lastPercentageReported;
/**
@@ -269,17 +273,31 @@ public class TaskStateCollectorService extends
AbstractScheduledService {
Long taskByteSize = Long.parseLong(stringSize);
- // if progress reporting is enabled, value should be present
+ // If progress reporting is enabled, value should be present
if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
LOGGER.warn("Expected to report job progress but total bytes to copy
property null");
return;
}
this.totalSizeToCopy =
this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
- // avoid flooding Kafka message queue by sending GobblinTrackingEvents
only when threshold is passed
- this.bytesCopiedSoFar += taskByteSize;
- Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+ // If total size in bytes cannot be calculated, then default to progress
reporting in terms of workunits
+ Double newPercentageCopied;
+ if (this.totalSizeToCopy == 0) {
+ this.totalNumWorkUnits =
this.jobState.getPropAsLong(AbstractJobLauncher.NUM_WORKUNITS);
+ this.workUnitsCompletedSoFar += 1;
+
+ if (this.totalNumWorkUnits == 0) {
+ LOGGER.warn("Expected to report job progress but work units are not
countable");
+ return;
+ }
+ newPercentageCopied = this.workUnitsCompletedSoFar /
this.totalNumWorkUnits;
+ } else {
+ this.bytesCopiedSoFar += taskByteSize;
+ newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+ }
+
+ // Avoid flooding Kafka message queue by sending GobblinTrackingEvents
only when threshold is passed
// Report progress when it reaches 100% regardless of difference from
lastPercentageReported
if (newPercentageCopied - this.lastPercentageReported >=
ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD ||
(Math.abs(newPercentageCopied - 1.0) < 0.001)) {