This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 003590e7d5 [GOBBLIN-2179] Enhance GoT observability with 
`WorkUnitsSizeSummary` and `WorkUnitSizeInfo` (#4082)
003590e7d5 is described below

commit 003590e7d58691505d8fb73988db35ae636873f1
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Dec 10 21:16:22 2024 -0800

    [GOBBLIN-2179] Enhance GoT observability with `WorkUnitsSizeSummary` and 
`WorkUnitSizeInfo` (#4082)
---
 .../AutoTroubleshooterLogAppender.java             |   2 +-
 .../gobblin/runtime/troubleshooter/Issue.java      |   6 +-
 gobblin-temporal/build.gradle                      |   1 +
 .../temporal/ddm/activity/GenerateWorkUnits.java   |   5 +
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   | 112 ++++++++++++++---
 .../gobblin/temporal/ddm/util/JobStateUtils.java   |  18 ++-
 ...EagerFsDirBackedWorkUnitClaimCheckWorkload.java |  29 ++++-
 .../temporal/ddm/work/GenerateWorkUnitsResult.java |   3 +
 .../ddm/work/PriorJobStateWUProcessingSpec.java    |   3 +-
 .../temporal/ddm/work/WorkUnitClaimCheck.java      |   6 +-
 ...kUnitsResult.java => WorkUnitsSizeSummary.java} |  25 ++--
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |  34 +++--
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |   4 +-
 .../temporal/dynamic/ScalingDirectiveSource.java   |   2 +-
 .../gobblin/temporal/dynamic/WorkforcePlan.java    |   2 +-
 .../workflows/metrics/TemporalEventTimer.java      |   6 +-
 .../activity/impl/GenerateWorkUnitsImplTest.java   |  95 +++++++++++++-
 ...rFsDirBackedWorkUnitClaimCheckWorkloadTest.java | 109 ++++++++++++++++
 gobblin-utility/build.gradle                       |   1 +
 .../org/apache/gobblin/util/JobLauncherUtils.java  |  19 ++-
 .../org/apache/gobblin/util/WorkUnitSizeInfo.java  | 138 +++++++++++++++++++++
 .../apache/gobblin/util/JobLauncherUtilsTest.java  |  90 ++++++++++++++
 .../apache/gobblin/util/WorkUnitSizeInfoTest.java  | 104 ++++++++++++++++
 gradle/scripts/dependencyDefinitions.gradle        |   1 +
 24 files changed, 754 insertions(+), 61 deletions(-)

diff --git 
a/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
 
b/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
index bb562bbe81..c3304ccef0 100644
--- 
a/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
+++ 
b/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
@@ -44,7 +44,7 @@ import 
org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
 
 
 /**
- * Collects messages from log4j and converts them into issues that are used in 
{@link AutomaticTroubleshooter}.
+ * Collects messages from log4j and converts them into {@link Issue}s used by 
the {@link org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter}.
  */
 @Slf4j
 @ThreadSafe
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
index 7f694b2032..c726931e95 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
@@ -50,7 +50,7 @@ public class Issue {
    *
    * It can be used for making programmatic decisions on how to handle and 
recover from this issue.
    *
-   * The code length should be less than {@link Issue.MAX_ISSUE_CODE_LENGTH}
+   * The code length should be less than {@link Issue#MAX_ISSUE_CODE_LENGTH}
    * */
   private final String code;
 
@@ -71,14 +71,14 @@ public class Issue {
    *
    * This is a full name of the class that logged the error or generated the 
issue.
    *
-   * The class name length should be less than {@link 
Issue.MAX_CLASSNAME_LENGTH}
+   * The class name length should be less than {@link 
Issue#MAX_CLASSNAME_LENGTH}
    * */
   private final String sourceClass;
 
   /**
    * If the issue was generated from an exception, then a full exception class 
name should be stored here.
    *
-   * The class name length should be less than {@link 
Issue.MAX_CLASSNAME_LENGTH}
+   * The class name length should be less than {@link 
Issue#MAX_CLASSNAME_LENGTH}
    */
   private final String exceptionClass;
 
diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle
index 1832ac909d..fa34245e91 100644
--- a/gobblin-temporal/build.gradle
+++ b/gobblin-temporal/build.gradle
@@ -62,6 +62,7 @@ dependencies {
     compile (externalDependency.helix) {
         exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
     }
+    compile externalDependency.tdigest
     compile externalDependency."temporal-sdk"
     testCompile project(path: ':gobblin-cluster', configuration: 'tests')
     testCompile project(":gobblin-example")
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
index 862b46c40f..a43f803914 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
@@ -30,6 +30,11 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 /** Activity for generating {@link WorkUnit}s and persisting them to the 
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
 @ActivityInterface
 public interface GenerateWorkUnits {
+
+  public static final String NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = 
GenerateWorkUnits.class.getName() + ".numWorkUnitsSizeInfoQuantiles";
+  public static final int DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = 10;
+
+
   /** @return the number of {@link WorkUnit}s generated and persisted */
   @ActivityMethod
   GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSubmitterContext);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 8344bae6f9..4996c16c1c 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -23,17 +23,23 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import com.google.api.client.util.Lists;
-import com.google.common.io.Closer;
-
 import io.temporal.failure.ApplicationFailure;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
+import com.google.api.client.util.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closer;
+import com.tdunning.math.stats.TDigest;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -41,6 +47,7 @@ import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.WorkUnitStreamSource;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
@@ -50,6 +57,7 @@ import org.apache.gobblin.source.workunit.WorkUnitStream;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -58,6 +66,39 @@ import 
org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 @Slf4j
 public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
 
+  /** [Internal, implementation class] Size sketch/digest of a collection of 
{@link MultiWorkUnit}s */
+  @Data
+  @VisibleForTesting
+  protected static class WorkUnitsSizeDigest {
+    private final long totalSize;
+    /** a top-level work unit has no parent - a root */
+    private final TDigest topLevelWorkUnitsSizeDigest;
+    /** a constituent work unit has no children - a leaf */
+    private final TDigest constituentWorkUnitsSizeDigest;
+
+    public WorkUnitsSizeSummary asSizeSummary(int numQuantiles) {
+      Preconditions.checkArgument(numQuantiles > 0, "numQuantiles must be > 
0");
+      final double quantilesWidth = 1.0 / numQuantiles;
+
+      List<Double> topLevelQuantileValues = 
getQuantiles(topLevelWorkUnitsSizeDigest, numQuantiles);
+      List<Double> constituentQuantileValues = 
getQuantiles(constituentWorkUnitsSizeDigest, numQuantiles);
+      return new WorkUnitsSizeSummary(
+          totalSize,
+          topLevelWorkUnitsSizeDigest.size(), 
constituentWorkUnitsSizeDigest.size(),
+          numQuantiles, quantilesWidth,
+          topLevelQuantileValues, constituentQuantileValues);
+    }
+
+    private static List<Double> getQuantiles(TDigest digest, int numQuantiles) 
{
+      List<Double> quantileMinSizes = Lists.newArrayList();
+      for (int i = 1; i <= numQuantiles; i++) {
+        quantileMinSizes.add(digest.quantile((i * 1.0) / numQuantiles));
+      }
+      return quantileMinSizes;
+    }
+  }
+
+
   @Override
   public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
     // TODO: decide whether to acquire a job lock (as MR did)!
@@ -80,12 +121,18 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       FileSystem fs = JobStateUtils.openFileSystem(jobState);
       fs.mkdirs(workDirRoot);
 
-      Set<String> resourcesToCleanUp = new HashSet<>();
-      List<WorkUnit> workUnits = 
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, 
eventSubmitterContext, closer, resourcesToCleanUp);
+      Set<String> pathsToCleanUp = new HashSet<>();
+      List<WorkUnit> workUnits = 
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, 
eventSubmitterContext, closer, pathsToCleanUp);
+
+      int numSizeSummaryQuantiles = 
getConfiguredNumSizeSummaryQuantiles(jobState);
+      WorkUnitsSizeSummary wuSizeSummary = 
digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
+      log.info("Discovered WorkUnits: {}", wuSizeSummary);
+
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
-      JobStateUtils.writeJobState(jobState, workDirRoot, fs);
+      JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: 
the writing of `JobState` after all WUs signifies WU gen+serialization now 
complete
 
-      return new GenerateWorkUnitsResult(jobState.getTaskCount(), 
resourcesToCleanUp);
+      String sourceClassName = JobStateUtils.getSourceClassName(jobState);
+      return new GenerateWorkUnitsResult(jobState.getTaskCount(), 
sourceClassName, wuSizeSummary, pathsToCleanUp);
     } catch (ReflectiveOperationException roe) {
       String errMsg = "Unable to construct a source for generating workunits 
for job " + jobState.getJobId();
       log.error(errMsg, roe);
@@ -101,7 +148,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
   }
 
   protected List<WorkUnit> 
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, 
EventSubmitterContext eventSubmitterContext, Closer closer,
-      Set<String> resourcesToCleanUp)
+      Set<String> pathsToCleanUp)
       throws ReflectiveOperationException {
     Source<?, ?> source = JobStateUtils.createSource(jobState);
     WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
@@ -127,7 +174,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     DestinationDatasetHandlerService datasetHandlerService = closer.register(
         new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, 
eventSubmitterContext.create()));
     WorkUnitStream handledWorkUnitStream = 
datasetHandlerService.executeHandlers(workUnitStream);
-    
resourcesToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
+    pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
     // initialize writer and converter(s)
     // TODO: determine whether registration here is effective, or the 
lifecycle of this activity is too brief (as is likely!)
     closer.register(WriterInitializerFactory.newInstace(jobState, 
handledWorkUnitStream)).initialize();
@@ -151,7 +198,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
   }
 
   protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream 
workUnitStream) {
-    Set<String> resourcesToCleanUp = new HashSet<>();
+    Set<String> workDirPaths = new HashSet<>();
     // Validate every workunit if they have the temp dir props since some 
workunits may be commit steps
     Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
     while (workUnitIterator.hasNext()) {
@@ -159,16 +206,16 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       if (workUnit.isMultiWorkUnit()) {
         List<WorkUnit> workUnitList = ((MultiWorkUnit) 
workUnit).getWorkUnits();
         for (WorkUnit wu : workUnitList) {
-          
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
+          // WARNING/TODO: NOT resilient to nested multi-workunits... should 
it be?
+          workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
         }
       } else {
-        
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
+        
workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
       }
     }
-    return resourcesToCleanUp;
+    return workDirPaths;
   }
 
-
   private static Set<String> 
collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit workUnit) {
     Set<String> resourcesToCleanUp = new HashSet<>();
     if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
@@ -183,4 +230,41 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     }
     return resourcesToCleanUp;
   }
+
+  /** @return the {@link WorkUnitsSizeDigest} for `workUnits` */
+  protected static WorkUnitsSizeDigest digestWorkUnitsSize(List<WorkUnit> 
workUnits) {
+    AtomicLong totalSize = new AtomicLong(0L);
+    TDigest topLevelWorkUnitsDigest = TDigest.createDigest(100);
+    TDigest constituentWorkUnitsDigest = TDigest.createDigest(100);
+
+    Iterator<WorkUnit> workUnitIterator = workUnits.iterator();
+    while (workUnitIterator.hasNext()) {
+      WorkUnit workUnit = workUnitIterator.next();
+      if (workUnit.isMultiWorkUnit()) {
+        List<WorkUnit> subWorkUnitsList = ((MultiWorkUnit) 
workUnit).getWorkUnits();
+        AtomicLong mwuAggSize = new AtomicLong(0L);
+        // WARNING/TODO: NOT resilient to nested multi-workunits... should it 
be?
+        subWorkUnitsList.stream().mapToLong(wu -> 
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).forEach(wuSize -> {
+          constituentWorkUnitsDigest.add(wuSize);
+          mwuAggSize.addAndGet(wuSize);
+        });
+        totalSize.addAndGet(mwuAggSize.get());
+        topLevelWorkUnitsDigest.add(mwuAggSize.get());
+      } else {
+        long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 
0);
+        totalSize.addAndGet(wuSize);
+        constituentWorkUnitsDigest.add(wuSize);
+        topLevelWorkUnitsDigest.add(wuSize);
+      }
+    }
+
+    // TODO - decide whether helpful/necessary to `.compress()`
+    topLevelWorkUnitsDigest.compress();
+    constituentWorkUnitsDigest.compress();
+    return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, 
constituentWorkUnitsDigest);
+  }
+
+  public static int getConfiguredNumSizeSummaryQuantiles(State state) {
+    return 
state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, 
GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index 37d1f90477..52da7b5299 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -46,6 +46,7 @@ import org.apache.gobblin.runtime.SourceDecorator;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.source.Source;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
@@ -76,9 +77,14 @@ public class JobStateUtils {
     return Help.loadFileSystemForUriForce(getFileSystemUri(jobState), 
jobState);
   }
 
-  /** @return a new instance of {@link Source} identified by {@link 
ConfigurationKeys#SOURCE_CLASS_KEY} */
+  /** @return the FQ class name, presumed configured as {@link 
ConfigurationKeys#SOURCE_CLASS_KEY} */
+  public static String getSourceClassName(JobState jobState) {
+    return jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY);
+  }
+
+  /** @return a new instance of {@link Source}, identified by {@link 
ConfigurationKeys#SOURCE_CLASS_KEY} */
   public static Source<?, ?> createSource(JobState jobState) throws 
ReflectiveOperationException {
-    Class<?> sourceClass = 
Class.forName(jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY));
+    Class<?> sourceClass = Class.forName(getSourceClassName(jobState));
     log.info("Creating source: '{}'", sourceClass.getName());
     Source<?, ?> source = new SourceDecorator<>(
         Source.class.cast(sourceClass.newInstance()),
@@ -145,7 +151,10 @@ public class JobStateUtils {
     return fs.makeQualified(jobOutputPath);
   }
 
-  /** write serialized {@link WorkUnit}s in parallel into files named after 
the jobID and task IDs */
+  /**
+   * write serialized {@link WorkUnit}s in parallel into files named to tunnel 
{@link org.apache.gobblin.util.WorkUnitSizeInfo}.
+   * {@link EagerFsDirBackedWorkUnitClaimCheckWorkload} (and possibly others) 
may later recover such size info.
+   */
   public static void writeWorkUnits(List<WorkUnit> workUnits, Path 
workDirRootPath, JobState jobState, FileSystem fs)
       throws IOException {
     String jobId = jobState.getJobId();
@@ -159,7 +168,8 @@ public class JobStateUtils {
       JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new 
JobLauncherUtils.WorkUnitPathCalculator();
       int i = 0;
       for (WorkUnit workUnit : workUnits) {
-        Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId, 
targetDirPath);
+        // tunnel each WU's size info via its filename, for 
`EagerFsDirBackedWorkUnitClaimCheckWorkload#extractTunneledWorkUnitSizeInfo`
+        Path workUnitFile = 
pathCalculator.calcNextPathWithTunneledSizeInfo(workUnit, jobId, targetDirPath);
         if (i++ == 0) {
           log.info("Writing work unit file [first of {}]: '{}'", 
workUnits.size(), workUnitFile);
         }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
index ac75971d2b..24f6363093 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -19,12 +19,19 @@ package org.apache.gobblin.temporal.ddm.work;
 
 import java.net.URI;
 import java.util.Comparator;
+import java.util.Optional;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.WorkUnitSizeInfo;
 
 
 /**
@@ -33,6 +40,7 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
  */
 @lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @lombok.ToString(callSuper = true)
+@Slf4j
 public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends 
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
   private EventSubmitterContext eventSubmitterContext;
 
@@ -43,8 +51,9 @@ public class EagerFsDirBackedWorkUnitClaimCheckWorkload 
extends AbstractEagerFsD
 
   @Override
   protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
-    // begin by setting all correlators to empty
-    return new WorkUnitClaimCheck("", this.getFileSystemUri(), 
fileStatus.getPath().toString(), this.eventSubmitterContext);
+    // begin by setting all correlators to empty string - later we'll 
`acknowledgeOrdering()`
+    Path filePath = fileStatus.getPath();
+    return new WorkUnitClaimCheck("", this.getFileSystemUri(), 
filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), 
this.eventSubmitterContext);
   }
 
   @Override
@@ -58,4 +67,20 @@ public class EagerFsDirBackedWorkUnitClaimCheckWorkload 
extends AbstractEagerFsD
     // later, after the post-total-ordering indices are know, use each item's 
index as its correlator
     item.setCorrelator(Integer.toString(index));
   }
+
+  /**
+   * @return the {@link WorkUnitSizeInfo}, when encoded in the filename; 
otherwise {@link WorkUnitSizeInfo#empty()} when no size info about {@link 
WorkUnit}
+   * @see 
org.apache.gobblin.util.JobLauncherUtils.WorkUnitPathCalculator#calcNextPathWithTunneledSizeInfo(WorkUnit,
 String, Path)
+   */
+  protected static WorkUnitSizeInfo extractTunneledWorkUnitSizeInfo(Path 
filePath) {
+    String fileName = filePath.getName();
+    Optional<WorkUnitSizeInfo> optSizeInfo = Optional.empty();
+    try {
+      String maybeEncodedSizeInfo = Id.parse(fileName.substring(0, 
fileName.lastIndexOf('.'))).getName(); // strip extension
+      optSizeInfo = WorkUnitSizeInfo.decode(maybeEncodedSizeInfo);
+    } catch (Exception e) { // log, but swallow any `Id.parse` error
+      log.warn("Filename NOT `Id.parse`able: '" + filePath + "' - " + 
e.getMessage());
+    }
+    return optSizeInfo.orElse(WorkUnitSizeInfo.empty());
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
index 5f798055e2..f30998ae85 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
@@ -33,7 +33,10 @@ import lombok.RequiredArgsConstructor;
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class GenerateWorkUnitsResult {
+  // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite 
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
   @NonNull private int generatedWuCount;
+  @NonNull private String sourceClass;
+  @NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
   // Resources that the Temporal Job Launcher should clean up for Gobblin 
temporary work directory paths in writers
   @NonNull private Set<String> workDirPathsToDelete;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
index ff3c9eb6f2..02ba87b3fa 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
@@ -48,8 +48,7 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 @EqualsAndHashCode(callSuper = true) // to prevent findbugs warning - "equals 
method overrides equals in superclass and may not be symmetric"
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 public class PriorJobStateWUProcessingSpec extends WUProcessingSpec {
-  @NonNull
-  private List<Tag<?>> tags = new ArrayList<>();
+  @NonNull private List<Tag<?>> tags = new ArrayList<>();
   @NonNull private String metricsSuffix = 
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
 
   public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, 
EventSubmitterContext eventSubmitterContext) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index 5cb2064b11..e454c9ceef 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -21,18 +21,19 @@ import java.net.URI;
 
 import org.apache.hadoop.fs.Path;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.WorkUnitSizeInfo;
 
 
 /**
@@ -47,6 +48,7 @@ public class WorkUnitClaimCheck implements FileSystemApt, 
FileSystemJobStateful
   @NonNull private String correlator;
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitPath;
+  @NonNull private WorkUnitSizeInfo workUnitSizeInfo;
   @NonNull private EventSubmitterContext eventSubmitterContext;
 
   @JsonIgnore // (because no-arg method resembles 'java bean property')
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
similarity index 50%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
index 5f798055e2..3ea426c284 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
@@ -17,23 +17,34 @@
 
 package org.apache.gobblin.temporal.ddm.work;
 
-import java.util.Set;
+import java.util.List;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
 
 /**
- * Data structure representing the result of generating work units, where it 
returns the number of generated work units and
- * the folders should be cleaned up after the full job execution is completed
+ * Total size, counts, and size distributions for a collection of {@link 
MultiWorkUnit}s, both with regard to top-level (possibly multi) {@link 
WorkUnit}s
+ * and individual constituent (purely {@link WorkUnit}s), where:
+ *   * a top-level work unit is one with no parent - a root
+ *   * a constituent work unit is one with no children - a leaf
+ * @see org.apache.gobblin.util.WorkUnitSizeInfo
  */
 @Data
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
-public class GenerateWorkUnitsResult {
-  @NonNull private int generatedWuCount;
-  // Resources that the Temporal Job Launcher should clean up for Gobblin 
temporary work directory paths in writers
-  @NonNull private Set<String> workDirPathsToDelete;
+public class WorkUnitsSizeSummary {
+  // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite 
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+  @NonNull private long totalSize;
+  @NonNull private long topLevelWorkUnitsCount;
+  @NonNull private long constituentWorkUnitsCount;
+  @NonNull private int quantilesCount;
+  @NonNull private double quantilesWidth;
+  @NonNull private List<Double> topLevelQuantilesMinSizes;
+  @NonNull private List<Double> constituentQuantilesMinSizes;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 2aa2a7e649..1d5a63b736 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -49,6 +49,7 @@ import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
 import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -97,14 +98,15 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
-    timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit();
-    EventTimer timer = timerFactory.createJobTimer();
+    timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
+    EventTimer jobSuccessTimer = timerFactory.createJobTimer();
     Optional<GenerateWorkUnitsResult> generateWorkUnitResultsOpt = 
Optional.empty();
     WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
     boolean isSuccessful = false;
     try {
       generateWorkUnitResultsOpt = 
Optional.of(genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext));
-      int numWUsGenerated = 
generateWorkUnitResultsOpt.get().getGeneratedWuCount();
+      WorkUnitsSizeSummary wuSizeSummary = 
generateWorkUnitResultsOpt.get().getWorkUnitsSizeSummary();
+      int numWUsGenerated = 
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
       int numWUsCommitted = 0;
       CommitStats commitStats = CommitStats.createEmpty();
       if (numWUsGenerated > 0) {
@@ -112,18 +114,16 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
         commitStats = processWUsWorkflow.process(wuSpec);
         numWUsCommitted = commitStats.getNumCommittedWorkUnits();
       }
-      timer.stop();
+      jobSuccessTimer.stop();
       isSuccessful = true;
       return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, 
jobProps.getProperty(Help.USER_TO_PROXY_KEY),
           commitStats.getDatasetStats());
     } catch (Exception e) {
       // Emit a failed GobblinTrackingEvent to record job failures
-      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); // 
update GaaS: `ExecutionStatus.FAILED`; `TimingEvent.JOB_END_TIME`
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
-          e.getClass().getName(),
-          e
-      );
+          e.getClass().getName(), e);
     } finally {
       // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid 
flight
       try {
@@ -138,9 +138,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
         if (isSuccessful) {
           throw ApplicationFailure.newNonRetryableFailureWithCause(
               String.format("Failed cleaning Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
-              e.getClass().getName(),
-              e
-          );
+              e.getClass().getName(), e);
         }
         log.error("Failed to cleanup work dirs", e);
       }
@@ -210,4 +208,18 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     }
     return resultSet;
   }
+
+  /**
+   * Historical practice counted {@link 
org.apache.gobblin.source.workunit.WorkUnit}s with {@link int} (see e.g. {@link 
JobState#getTaskCount()}).
+   * Updated counting now uses {@link long}, although much code still presumes 
{@link int}.  While we don't presently anticipate jobs exceeding 2 billion
+   * `WorkUnit`s, if it were ever to happen, this method will fail-fast to 
flag the need to address.
+   * @throws {@link IllegalArgumentException} if the count exceeds {@link 
Integer#MAX_VALUE}
+   */
+  protected static int 
safelyCastNumConstituentWorkUnitsOrThrow(WorkUnitsSizeSummary wuSizeSummary) {
+    long n = wuSizeSummary.getConstituentWorkUnitsCount();
+    if (n > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException("Too many constituent WorkUnits (" + 
n + ") - exceeds `Integer.MAX_VALUE`!");
+    }
+    return (int) n;
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index ccd2979491..6402e473bf 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -66,8 +66,8 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
     try {
       jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec));
     } catch (Exception e) {
-      log.error("Exception occured during loading jobState", e);
-      throw new RuntimeException("Exception occured during loading jobState", 
e);
+      log.error("Error loading jobState", e);
+      throw new RuntimeException("Error loading jobState", e);
     }
     searchAttributes = 
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
index 1b0f79e78d..764725dd90 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 
 /** An opaque source of {@link 
org.apache.gobblin.temporal.dynamic.ScalingDirective}s */
-public interface ScalingDirectiveSource extends Cloneable {
+public interface ScalingDirectiveSource {
   /** @return {@link ScalingDirective}s - an impl. may choose to return all 
known directives or to give only newer directives than previously returned */
   List<ScalingDirective> getScalingDirectives() throws IOException;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
index dde5555644..2a070e2ed9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
@@ -167,7 +167,7 @@ public class WorkforcePlan {
     return profiles.getOrThrow(profileName);
   }
 
-  /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it 
should NEVER {@link WorkforceProfiles.UnknownProfileException} */
+  /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it 
should NEVER throw {@link WorkforceProfiles.UnknownProfileException} */
   @VisibleForTesting
   WorkerProfile peepBaselineProfile() throws 
WorkforceProfiles.UnknownProfileException {
     return profiles.getOrThrow(WorkforceProfiles.BASELINE_NAME);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index c9d9e940ec..e1ac601986 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -30,7 +30,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
 
 
 /**
- * Boiler plate for tracking elapsed time of events that is compatible with 
{@link Workflow}
+ * Boilerplate for tracking elapsed time of events that is compatible with 
{@link Workflow}
  * by using activities to record time
  *
  * This class is very similar to {@link TimingEvent} but uses {@link Workflow} 
compatible APIs. It's possible to refactor
@@ -106,9 +106,9 @@ public class TemporalEventTimer implements EventTimer {
      * @return a timer that emits an event at the beginning of the job and a 
completion event ends at the end of the job
      */
     public TemporalEventTimer createJobTimer() {
-      TemporalEventTimer startTimer = 
create(TimingEvent.LauncherTimings.JOB_START);
+      TemporalEventTimer startTimer = 
create(TimingEvent.LauncherTimings.JOB_START); // update GaaS: 
`ExecutionStatus.RUNNING`
       startTimer.stop(Instant.EPOCH); // Emit start job event containing a 
stub end time
-      // GaaS job status monitor tracks for SUCCEEDED events or FAILED events 
for job completion
+      // [upon `.stop()`] update GaaS: `ExecutionStatus.RUNNING`, 
`TimingEvent.JOB_END_TIME`:
       return create(TimingEvent.LauncherTimings.JOB_SUCCEEDED, 
startTimer.startTime);
     }
   }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
index 86c5ac12de..8c94783a7d 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -24,10 +24,12 @@ import java.util.Set;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
 
 
 public class GenerateWorkUnitsImplTest {
@@ -36,7 +38,7 @@ public class GenerateWorkUnitsImplTest {
   public void testFetchesWorkDirsFromWorkUnits() {
     List<WorkUnit> workUnits = new ArrayList<>();
     for (int i = 0; i < 5; i++) {
-      WorkUnit workUnit = new WorkUnit();
+      WorkUnit workUnit = WorkUnit.createEmpty();
       workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i);
       workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i);
       workUnit.setProp("qualitychecker.row.err.file", 
"/tmp/jobId/row-err/file");
@@ -54,9 +56,9 @@ public class GenerateWorkUnitsImplTest {
   public void testFetchesWorkDirsFromMultiWorkUnits() {
     List<WorkUnit> workUnits = new ArrayList<>();
     for (int i = 0; i < 5; i++) {
-      MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+      MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
       for (int j = 0; j < 3; j++) {
-        WorkUnit workUnit = new WorkUnit();
+        WorkUnit workUnit = WorkUnit.createEmpty();
         workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/");
         workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/");
         workUnit.setProp("qualitychecker.row.err.file", 
"/tmp/jobId/row-err/file");
@@ -76,9 +78,9 @@ public class GenerateWorkUnitsImplTest {
   public void testFetchesUniqueWorkDirsFromMultiWorkUnits() {
     List<WorkUnit> workUnits = new ArrayList<>();
     for (int i = 0; i < 5; i++) {
-      MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+      MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
       for (int j = 0; j < 3; j++) {
-        WorkUnit workUnit = new WorkUnit();
+        WorkUnit workUnit = WorkUnit.createEmpty();
         // Each MWU will have its own staging and output dir
         workUnit.setProp("writer.staging.dir", "/tmp/jobId/" + i + 
"/task-staging/");
         workUnit.setProp("writer.output.dir", "/tmp/jobId/" + i + 
"task-output/");
@@ -94,4 +96,87 @@ public class GenerateWorkUnitsImplTest {
     Set<String> output = 
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
     Assert.assertEquals(output.size(), 11);
   }
+
+  @Test
+  public void testDigestWorkUnitsSize() {
+    int numSingleWorkUnits = 5;
+    int numMultiWorkUnits = 15;
+    long singleWorkUnitSizeFactor = 100L;
+    long multiWorkUnitSizeFactor = 70L;
+    List<WorkUnit> workUnits = new ArrayList<>();
+
+    // configure size of non-multi-work-units (increments of 
`singleWorkUnitSizeFactor`, starting from 0)
+    for (int i = 0; i < numSingleWorkUnits; i++) {
+      workUnits.add(createWorkUnitOfSize(i * singleWorkUnitSizeFactor));
+    }
+
+    // configure size of multi-work-units, each containing between 1 and 4 
sub-work-unit children
+    for (int i = 0; i < numMultiWorkUnits; i++) {
+      MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
+      int subWorkUnitCount = 1 + (i % 4); // 1 to 4
+      for (int j = 0; j < subWorkUnitCount; j++) {
+        multiWorkUnit.addWorkUnit(createWorkUnitOfSize((j + 1) * 
multiWorkUnitSizeFactor));
+      }
+      workUnits.add(multiWorkUnit);
+    }
+
+    // calc expectations
+    long expectedTotalSize = 0L;
+    int expectedNumTopLevelWorkUnits = numSingleWorkUnits + numMultiWorkUnits;
+    int expectedNumConstituentWorkUnits = numSingleWorkUnits;
+    for (int i = 0; i < numSingleWorkUnits; i++) {
+      expectedTotalSize += i * singleWorkUnitSizeFactor;
+    }
+    for (int i = 0; i < numMultiWorkUnits; i++) {
+      int numSubWorkUnitsThisMWU = 1 + (i % 4);
+      expectedNumConstituentWorkUnits += numSubWorkUnitsThisMWU;
+      for (int j = 0; j < numSubWorkUnitsThisMWU; j++) {
+        expectedTotalSize += (j + 1) * multiWorkUnitSizeFactor;
+      }
+    }
+
+    GenerateWorkUnitsImpl.WorkUnitsSizeDigest wuSizeDigest = 
GenerateWorkUnitsImpl.digestWorkUnitsSize(workUnits);
+
+    Assert.assertEquals(wuSizeDigest.getTotalSize(), expectedTotalSize);
+    Assert.assertEquals(wuSizeDigest.getTopLevelWorkUnitsSizeDigest().size(), 
expectedNumTopLevelWorkUnits);
+    
Assert.assertEquals(wuSizeDigest.getConstituentWorkUnitsSizeDigest().size(), 
expectedNumConstituentWorkUnits);
+
+    int numQuantilesDesired = expectedNumTopLevelWorkUnits; // for simpler 
math during quantile verification (below)
+    WorkUnitsSizeSummary wuSizeInfo = 
wuSizeDigest.asSizeSummary(numQuantilesDesired);
+    Assert.assertEquals(wuSizeInfo.getTotalSize(), expectedTotalSize);
+    Assert.assertEquals(wuSizeInfo.getTopLevelWorkUnitsCount(), 
expectedNumTopLevelWorkUnits);
+    Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsCount(), 
expectedNumConstituentWorkUnits);
+    Assert.assertEquals(wuSizeInfo.getQuantilesCount(), numQuantilesDesired);
+    Assert.assertEquals(wuSizeInfo.getQuantilesWidth(), 1.0 / 
expectedNumTopLevelWorkUnits);
+    Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().size(), 
numQuantilesDesired); // same as `asSizeInfo` param
+    Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().size(), 
numQuantilesDesired); // same as `asSizeInfo` param
+
+    // expected sizes for (n=5) top-level non-multi-WUs: (1x) 0, (1x) 100, 
(1x) 200, (1x) 300, (1x) 400
+    // expected sizes for (n=15) top-level multi-WUs: [a] (4x) 70; [b] (4x) 
210 (= 70+140); [c] (4x) 420 (= 70+140+210); [d] (3x) 700 (= 70+140+210+280)
+    Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().toArray(),
+        new Double[]{
+            70.0, 70.0, 70.0, 70.0, // 4x MWU [a]
+            100.0, 200.0, // non-MWU [2, 3]
+            210.0, 210.0, 210.0, 210.0, // 4x MWU [b]
+            300.0, 400.0, // non-MWU [4, 5]
+            420.0, 420.0, 420.0, 420.0, // 4x MWU [c]
+            700.0, 700.0, 700.0, 700.0 }); // 3x MWU [d] + "100-percentile" 
(all WUs)
+
+    // expected sizes for (n=36) constituents from multi-WUs: [m] (15x = 
4+4+4+3) 70; [n] (11x = 4+4+3) 140; [o] (7x = 4+3) 210; [p] (3x) 280
+    Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().toArray(),
+        new Double[]{
+            70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, // (per 15x MWU [m]) - 
15/41 * 20 ~ 7.3
+            100.0, // non-MWU [2]
+            140.0, 140.0, 140.0, 140.0, 140.0, // (per 11x MWU [n]) - 
(15+1+11/41) * 20 ~ 13.2  |  13.2 - 8 = 5.2
+            200.0, // non-MWU [3]
+            210.0, 210.0, 210.0, // (per 7x MWU [o]) - (15+1+11+1+7/41) * 20 ~ 
17.0  |  17.0 - 14 = 3
+            280.0, 280.0, // 3x MWU [p] ... (15+1+11+1+7+3/41) * 20 ~ 18.5  |  
18.5 - 17 = 2
+            400.0 }); // with only one 20-quantile remaining, non-MWU [5] 
completes the "100-percentile" (all WUs)
+  }
+
+  public static WorkUnit createWorkUnitOfSize(long size) {
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);
+    return workUnit;
+  }
 }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
new file mode 100644
index 0000000000..6df60857eb
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.mockito.Mockito;
+import org.mockito.MockedStatic;
+
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.WorkUnitSizeInfo;
+
+
+/** Tests for {@link EagerFsDirBackedWorkUnitClaimCheckWorkload} */
+public class EagerFsDirBackedWorkUnitClaimCheckWorkloadTest {
+
+  private EagerFsDirBackedWorkUnitClaimCheckWorkload workload;
+  private EventSubmitterContext eventSubmitterContext;
+  private FileSystem mockFileSystem;
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    URI fileSystemUri = new URI("hdfs://localhost:9000");
+    String hdfsDir = "/test/dir";
+    eventSubmitterContext = Mockito.mock(EventSubmitterContext.class);
+    workload = Mockito.spy(new 
EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir, 
eventSubmitterContext));
+    mockFileSystem = Mockito.mock(FileSystem.class);
+
+    MockedStatic<HadoopUtils> mockedHadoopUtils = 
Mockito.mockStatic(HadoopUtils.class);
+    mockedHadoopUtils.when(() -> HadoopUtils.getFileSystem(Mockito.any(), 
Mockito.any())).thenReturn(mockFileSystem);
+  }
+
+  @Test
+  public void testExtractWorkUnitSizeInfo() throws Exception {
+    // mock `FileStatus` having an encoded WorkUnitSizeInfo as filename
+    List<FileStatus> fileStatuses = Arrays.asList(
+        
createMockFileStatus("/test/dir/multitask_n=3-total=100-median=100.0-mean=100.0-stddev=0.0_0.mwu"),
+        
createMockFileStatus("/test/dir/multitask_n=2-total=200-median=100.0-mean=100.0-stddev=0.0_1.mwu"),
+        
createMockFileStatus("/test/dir/task_n=1-total=300-median=100.0-mean=100.0-stddev=0.0_5.wu"),
+        
createMockFileStatus("/test/dir/multitask_n=4-total=400-median=100.0-mean=100.0-stddev=0.0_30.mwu"),
+        
createMockFileStatus("/test/dir/task_n=1-total=500-median=100.0-mean=100.0-stddev=0.0_2.wu")
+    );
+
+    Mockito.when(mockFileSystem.listStatus(Mockito.any(Path.class), 
Mockito.any(PathFilter.class)))
+        .thenReturn(fileStatuses.toArray(new FileStatus[0]));
+
+    Optional<Workload.WorkSpan<WorkUnitClaimCheck>> span = workload.getSpan(0, 
5);
+    Assert.assertTrue(span.isPresent());
+
+    Workload.WorkSpan<WorkUnitClaimCheck> workSpan = span.get();
+    Assert.assertEquals(workSpan.getNumElems(), fileStatuses.size());
+
+    Iterable<WorkUnitClaimCheck> workSpanIterable = () -> workSpan;
+    List<WorkUnitClaimCheck> wuClaimChecks = 
StreamSupport.stream(workSpanIterable.spliterator(), false)
+        .collect(Collectors.toList());
+
+    // note: since ordering is based on filename, it will not be the same as 
`fileStatuses` above
+    for (WorkUnitClaimCheck workUnitClaimCheck : wuClaimChecks) {
+      WorkUnitSizeInfo sizeInfo = workUnitClaimCheck.getWorkUnitSizeInfo();
+      Assert.assertNotNull(sizeInfo);
+      int n = sizeInfo.getNumConstituents();
+      long expectedTotalSize = (n == 1 && 
workUnitClaimCheck.getWorkUnitPath().endsWith("_5.wu")) ? 300 :
+          n == 1 ? 500 :
+              n == 2 ? 200 :
+                  n == 3 ? 100 :
+                      n == 4 ? 400 :
+                          -1; // unexpected (sentinel)
+      Assert.assertEquals(sizeInfo.getTotalSize(), expectedTotalSize);
+    }
+  }
+
+  private static FileStatus createMockFileStatus(String path) {
+    FileStatus fileStatus = Mockito.mock(FileStatus.class);
+    Path filePath = new Path(path);
+    Mockito.when(fileStatus.getPath()).thenReturn(filePath);
+    return fileStatus;
+  }
+}
+
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index b916f214cd..80e73afcfd 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -40,6 +40,7 @@ dependencies {
   compile externalDependency.metricsCore
   compile externalDependency.guavaretrying
   compile externalDependency.guice
+  compile externalDependency.tdigest
   compile externalDependency.typesafeConfig
   compile externalDependency.commonsPool
   compile externalDependency.hadoopClientCommon
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index f70f11f752..d1f97681ee 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.Cache;
@@ -40,8 +42,6 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -66,13 +66,26 @@ public class JobLauncherUtils {
   public static class WorkUnitPathCalculator {
     private final AtomicInteger nextMultiWorkUnitTaskId = new AtomicInteger(0);
 
-    // Serialize each work unit into a file named after the task ID
+    /** @return `Path` beneath `basePath` to serialize `workUnit`, with file 
named after the task ID (itself named after the job ID) */
     public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
       String workUnitFileName = workUnit.isMultiWorkUnit()
           ? JobLauncherUtils.newMultiTaskId(jobId, 
nextMultiWorkUnitTaskId.getAndIncrement()) + 
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
           : workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + 
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
       return new Path(basePath, workUnitFileName);
     }
+
+    /**
+     * Calc where to serialize {@link WorkUnit}, using a filename that tunnels 
{@link WorkUnitSizeInfo} (vs. repeating the task/job ID, as was legacy practice)
+     * This provides a direct and simple way to tunnel limited, but crucial, 
metadata that incurs no additional access cost (nor adds FS load).
+     * @return `Path` beneath `basePath` to serialize `workUnit`
+     */
+    public Path calcNextPathWithTunneledSizeInfo(WorkUnit workUnit, String 
jobId, Path basePath) {
+      String encodedSizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit).encode();
+      String workUnitFileName = workUnit.isMultiWorkUnit()
+          ? Id.MultiTask.create(encodedSizeInfo, 
nextMultiWorkUnitTaskId.getAndIncrement()) + 
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+          : Id.Task.create(encodedSizeInfo, 
workUnit.getPropAsInt(ConfigurationKeys.TASK_KEY_KEY)) + 
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+      return new Path(basePath, workUnitFileName);
+    }
   }
 
   /**
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
new file mode 100644
index 0000000000..107590407a
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.tdunning.math.stats.TDigest;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Bare-bones size information about a {@link WorkUnit}, possibly a {@link 
MultiWorkUnit}, where a constituent work unit is one with no children - a leaf.
+ *
+ * Measurement currently requires the `WorkUnit` to define {@link 
ServiceConfigKeys#WORK_UNIT_SIZE}, otherwise sizes will be 0 with merely the 
count of
+ * constituent `WorkUnits`.  For the most part, at present, that key is 
supplied only by {@link org.apache.gobblin.data.management.copy.CopySource}.
+ * Nonetheless, the "contract" for any {@link 
org.apache.gobblin.source.Source} is both clear and reasonable: just add "size" 
to your `WorkUnit`s to
+ * participate.
+ *
+ * Some sources might count bytes, others num records, possibly with those 
size-weighted; and of course not all sources extract a definite
+ * amount of data, known up front.  In such cases, the {@link 
#numConstituents} (aka. parallelism potential) may be most informative.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkUnitSizeInfo {
+  // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite 
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+  @NonNull private int numConstituents;
+  @NonNull private long totalSize;
+  @NonNull private double medianSize;
+  @NonNull private double meanSize;
+  @NonNull private double stddevSize;
+
+  /** @return the 'zero' {@link WorkUnitSizeInfo} */
+  public static WorkUnitSizeInfo empty() {
+    return new WorkUnitSizeInfo(0, 0, 0.0, 0.0, 0.0);
+  }
+
+  /**
+   * convenience factory to measure a {@link WorkUnit} - preferable to direct 
ctor call
+   * @returns {@link #empty()} when the `WorkUnit` is not measurable by 
defining {@link ServiceConfigKeys#WORK_UNIT_SIZE}
+   */
+  public static WorkUnitSizeInfo forWorkUnit(WorkUnit workUnit) {
+    // NOTE: redundant `instanceof` merely to appease FindBugs - 
"Unchecked/unconfirmed cast ..."
+    if (!workUnit.isMultiWorkUnit() || !(workUnit instanceof MultiWorkUnit)) {
+      long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 
0);
+      return new WorkUnitSizeInfo(1, wuSize, wuSize, wuSize, 0.0);
+    } else {
+      // WARNING/TODO: NOT resilient to nested multi-workunits... should it be?
+      List<WorkUnit> subWorkUnitsList = ((MultiWorkUnit) 
workUnit).getWorkUnits();
+      if (subWorkUnitsList.isEmpty()) {
+        return WorkUnitSizeInfo.empty();
+      }
+      int n = subWorkUnitsList.size();
+      TDigest constituentWorkUnitsDigest = TDigest.createDigest(100);
+      AtomicLong totalSize = new AtomicLong(0L);
+      List<Long> subWorkUnitSizes = subWorkUnitsList.stream().mapToLong(wu -> 
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0))
+          .boxed().collect(Collectors.toList());
+      subWorkUnitSizes.forEach(wuSize -> {
+        constituentWorkUnitsDigest.add(wuSize);
+        totalSize.addAndGet(wuSize);
+      });
+      double mean = totalSize.get() * 1.0 / n;
+      double variance = subWorkUnitSizes.stream().mapToDouble(wuSize -> {
+        double meanDiff = wuSize - mean;
+        return meanDiff * meanDiff;
+      }).sum() / n;
+      double stddev = Math.sqrt(variance);
+      double median = constituentWorkUnitsDigest.quantile(0.5);
+      return new WorkUnitSizeInfo(n, totalSize.get(), median, mean, stddev);
+    }
+  }
+
+  /**
+   * @return stringified, human-readable prop+value encoding that may be 
inverted with {@link #decode(String)}
+   *
+   * NOTE: The resulting encoded form will be between 42 and 117 chars:
+   *   - presuming - a 32-bit int (max 10 digits), a 64-bit long (max 19 
digits), and a 64-bit double (max 19 digits)
+   *   * 86 digits maximum for the values - 1*int + 1*long + 3*double = 10 + 
19 + 3*19 = 80
+   *   * 11 digits minimum for the values - 1*int + 1*long + 3*double = 1 + 1 
+ 3*3 = 11
+   *   * 22 digits for the names [1+5+6+4+6]
+   *   * 9 digits for additional syntax [5+4]
+   *   = 117 digits (max)
+   *   =  42 digits (min)
+   */
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public String encode() {
+    return String.format("n=%d-total=%d-median=%.2f-mean=%.2f-stddev=%.2f",
+        numConstituents, totalSize, medianSize, meanSize, stddevSize);
+  }
+
+  private static final String DECODING_REGEX = 
"^n=(\\d+)-total=(\\d+)-median=(\\d+(?:\\.\\d+)?)-mean=(\\d+(?:\\.\\d+)?)-stddev=(\\d+(?:\\.\\d+)?)$";
+  private static final Pattern decodingPattern = 
Pattern.compile(DECODING_REGEX);
+
+  /** @return the parsed size info, when `encoded` is in {@link 
WorkUnitSizeInfo#encode()}-compatible form; otherwise {@link Optional#empty()} 
*/
+  public static Optional<WorkUnitSizeInfo> decode(String encoded) {
+    Matcher decoding = decodingPattern.matcher(encoded);
+    if (!decoding.matches()) {
+      return Optional.empty();
+    } else {
+      return Optional.of(new WorkUnitSizeInfo(
+          Integer.parseInt(decoding.group(1)),
+          Long.parseLong(decoding.group(2)),
+          Double.parseDouble(decoding.group(3)),
+          Double.parseDouble(decoding.group(4)),
+          Double.parseDouble(decoding.group(5))
+      ));
+    }
+  }
+}
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
index c11fcc460c..9bdae9ffea 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
@@ -26,12 +26,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.LoggerFactory;
+
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.Extract.TableType;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -48,6 +50,8 @@ public class JobLauncherUtilsTest {
 
   private static final String JOB_NAME = "foo";
   private static final Pattern PATTERN = Pattern.compile("job_" + JOB_NAME + 
"_\\d+");
+  private final Path wuBasePath = new Path("/abs/base/path");
+  private final String wuPathJobId = Id.Task.create("test_wu_path_JobId", 
6).toString();
   private String jobId;
 
   @Test
@@ -87,6 +91,92 @@ public class JobLauncherUtilsTest {
     
Assert.assertEquals(JobLauncherUtils.flattenWorkUnits(workUnitsAndMultiWorkUnits).size(),
 9);
   }
 
+  @Test
+  public void testCalcNextPathSingleWorkUnit() {
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task1");
+
+    JobLauncherUtils.WorkUnitPathCalculator pathCalc = new 
JobLauncherUtils.WorkUnitPathCalculator();
+    Path resultPath = pathCalc.calcNextPath(workUnit, wuPathJobId, wuBasePath);
+
+    Assert.assertEquals(resultPath, new Path(wuBasePath, "task1" + 
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION));
+  }
+
+  @Test
+  public void testCalcNextPathWithTunneledSizeInfoSingleWorkUnit() {
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task1");
+    workUnit.setProp(ConfigurationKeys.TASK_KEY_KEY, "789");
+    workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, "120");
+
+    JobLauncherUtils.WorkUnitPathCalculator pathCalc = new 
JobLauncherUtils.WorkUnitPathCalculator();
+    Path resultPath = pathCalc.calcNextPathWithTunneledSizeInfo(workUnit, 
jobId, wuBasePath);
+
+    String encodedSizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit).encode();
+    String expectedFileName = Id.Task.PREFIX + "_" + encodedSizeInfo + "_789" 
+ JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+    Assert.assertEquals(resultPath, new Path(wuBasePath, expectedFileName));
+  }
+
+  @Test
+  public void testCalcNextPathMultiWorkUnit() {
+    MultiWorkUnit multiWorkUnitA = MultiWorkUnit.createEmpty();
+    for (int i = 0; i < 5; i++) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+      workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_A" + 
i);
+      multiWorkUnitA.addWorkUnit(workUnit);
+    }
+    MultiWorkUnit multiWorkUnitB = MultiWorkUnit.createEmpty();
+    for (int i = 0; i < 3; i++) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+      workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_B" + 
i);
+      multiWorkUnitB.addWorkUnit(workUnit);
+    }
+
+    JobLauncherUtils.WorkUnitPathCalculator pathCalc = new 
JobLauncherUtils.WorkUnitPathCalculator();
+    Path resultPathA0 = pathCalc.calcNextPath(multiWorkUnitA, wuPathJobId, 
wuBasePath);
+    Path resultPathB1 = pathCalc.calcNextPath(multiWorkUnitB, wuPathJobId, 
wuBasePath);
+    Path resultPathB2 = pathCalc.calcNextPath(multiWorkUnitB, wuPathJobId, 
wuBasePath);
+
+    String expectedFileNameA0 = Id.MultiTask.PREFIX + "_" + 
wuPathJobId.replace("task_", "") + "_0" + 
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+    String expectedFileNameB1 = Id.MultiTask.PREFIX + "_" + 
wuPathJobId.replace("task_", "") + "_1" + 
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+    String expectedFileNameB2 = Id.MultiTask.PREFIX + "_" + 
wuPathJobId.replace("task_", "") + "_2" + 
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+    Assert.assertEquals(resultPathA0, new Path(wuBasePath, 
expectedFileNameA0));
+    Assert.assertEquals(resultPathB1, new Path(wuBasePath, 
expectedFileNameB1));
+    Assert.assertEquals(resultPathB2, new Path(wuBasePath, 
expectedFileNameB2));
+  }
+
+  @Test
+  public void testCalcNextPathWithTunneledSizeInfoMultiWorkUnit() {
+    MultiWorkUnit multiWorkUnitA = MultiWorkUnit.createEmpty();
+    for (int i = 0; i < 5; i++) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+      workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_A" + 
i);
+      workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, String.valueOf((i + 
1) * 75));
+      multiWorkUnitA.addWorkUnit(workUnit);
+    }
+    MultiWorkUnit multiWorkUnitB = MultiWorkUnit.createEmpty();
+    for (int i = 0; i < 3; i++) {
+      WorkUnit workUnit = WorkUnit.createEmpty();
+      workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_B" + 
i);
+      workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, String.valueOf((i + 
1) * 66));
+      multiWorkUnitB.addWorkUnit(workUnit);
+    }
+
+    JobLauncherUtils.WorkUnitPathCalculator pathCalc = new 
JobLauncherUtils.WorkUnitPathCalculator();
+    Path resultPathA0 = 
pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitA, wuPathJobId, 
wuBasePath);
+    Path resultPathB1 = 
pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitB, wuPathJobId, 
wuBasePath);
+    Path resultPathB2 = 
pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitB, wuPathJobId, 
wuBasePath);
+
+    String encodedSizeInfoA = 
WorkUnitSizeInfo.forWorkUnit(multiWorkUnitA).encode();
+    String encodedSizeInfoB = 
WorkUnitSizeInfo.forWorkUnit(multiWorkUnitB).encode();
+    String expectedFileNameA0 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoA + 
"_0" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+    String expectedFileNameB1 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoB + 
"_1" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+    String expectedFileNameB2 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoB + 
"_2" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+    Assert.assertEquals(resultPathA0, new Path(wuBasePath, 
expectedFileNameA0));
+    Assert.assertEquals(resultPathB1, new Path(wuBasePath, 
expectedFileNameB1));
+    Assert.assertEquals(resultPathB2, new Path(wuBasePath, 
expectedFileNameB2));
+  }
+
   @Test
   public void testDeleteStagingData() throws IOException {
     FileSystem fs = FileSystem.getLocal(new Configuration());
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java
new file mode 100644
index 0000000000..ab2b689ea8
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import java.util.Optional;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/** Tests for {@link WorkUnitSizeInfo}. */
+public class WorkUnitSizeInfoTest {
+
+  @Test
+  public void testMultiWorkUnitSizeInfo() {
+    List<WorkUnit> workUnits = new ArrayList<>();
+    long totalSize = 0;
+    for (int i = 1; i <= 20; i++) {
+      long size = i * 100L;
+      totalSize += size;
+      WorkUnit wu = WorkUnit.createEmpty();
+      wu.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);
+      workUnits.add(wu);
+    }
+    MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
+    multiWorkUnit.addWorkUnits(workUnits);
+
+    double expectedMean = totalSize * 1.0 / workUnits.size();
+
+    WorkUnitSizeInfo sizeInfo = WorkUnitSizeInfo.forWorkUnit(multiWorkUnit);
+
+    Assert.assertEquals(sizeInfo.getNumConstituents(), 20);
+    Assert.assertEquals(sizeInfo.getTotalSize(), totalSize);
+    Assert.assertEquals(sizeInfo.getMedianSize(), 1100.0, 0.1);
+    Assert.assertEquals(sizeInfo.getMeanSize(), expectedMean, 0.1);
+    Assert.assertEquals(sizeInfo.getStddevSize(), 576.628, 0.1);
+  }
+
+  @Test
+  public void testSingleWorkUnitSizeInfo() {
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, 5432L);
+
+    WorkUnitSizeInfo sizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit);
+
+    Assert.assertEquals(sizeInfo.getNumConstituents(), 1);
+    Assert.assertEquals(sizeInfo.getTotalSize(), 5432L);
+    Assert.assertEquals(sizeInfo.getMedianSize(), 5432.0, 0.1);
+    Assert.assertEquals(sizeInfo.getMeanSize(), 5432.0, 0.1);
+    Assert.assertEquals(sizeInfo.getStddevSize(), 0.0, 0.1);
+  }
+
+  @Test
+  public void testEncodeThenDecodeRoundTrip() {
+    WorkUnitSizeInfo original = new WorkUnitSizeInfo(20, 21000L, 1100.0, 
1050.0, 576.628);
+    String encoded = original.encode();
+
+    Assert.assertEquals(encoded, 
"n=20-total=21000-median=1100.00-mean=1050.00-stddev=576.63");
+
+    Optional<WorkUnitSizeInfo> optDecoded = WorkUnitSizeInfo.decode(encoded);
+    Assert.assertTrue(optDecoded.isPresent());
+    WorkUnitSizeInfo decoded = optDecoded.get();
+    Assert.assertEquals(decoded.getNumConstituents(), 
original.getNumConstituents());
+    Assert.assertEquals(decoded.getTotalSize(), original.getTotalSize());
+    Assert.assertEquals(decoded.getMedianSize(), original.getMedianSize(), 
0.1);
+    Assert.assertEquals(decoded.getMeanSize(), original.getMeanSize(), 0.1);
+    Assert.assertEquals(decoded.getStddevSize(), original.getStddevSize(), 
0.1);
+  }
+
+  @Test
+  public void testDecodeFromIntFormattedDoubles() {
+    String encoded = "n=20-total=12345-median=1111-mean=617-stddev=543";
+
+    Optional<WorkUnitSizeInfo> optDecoded = WorkUnitSizeInfo.decode(encoded);
+    Assert.assertTrue(optDecoded.isPresent());
+    WorkUnitSizeInfo decoded = optDecoded.get();
+    Assert.assertEquals(decoded.getNumConstituents(), 20);
+    Assert.assertEquals(decoded.getTotalSize(), 12345L);
+    Assert.assertEquals(decoded.getMedianSize(), 1111.0);
+    Assert.assertEquals(decoded.getMeanSize(), 617.0);
+    Assert.assertEquals(decoded.getStddevSize(), 543.0);
+  }
+}
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 966ef824d7..e4b328c5e1 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -220,6 +220,7 @@ ext.externalDependency = [
         "org.slf4j:slf4j-log4j12:" + slf4jVersion
     ],
     "postgresConnector": "org.postgresql:postgresql:42.1.4",
+    "tdigest": "com.tdunning:t-digest:3.3",
     "testContainers": "org.testcontainers:testcontainers:1.17.3",
     "testContainersMysql": "org.testcontainers:mysql:1.17.3",
     "xz": "org.tukaani:xz:1.8"

Reply via email to