This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 003590e7d5 [GOBBLIN-2179] Enhance GoT observability with
`WorkUnitsSizeSummary` and `WorkUnitSizeInfo` (#4082)
003590e7d5 is described below
commit 003590e7d58691505d8fb73988db35ae636873f1
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Dec 10 21:16:22 2024 -0800
[GOBBLIN-2179] Enhance GoT observability with `WorkUnitsSizeSummary` and
`WorkUnitSizeInfo` (#4082)
---
.../AutoTroubleshooterLogAppender.java | 2 +-
.../gobblin/runtime/troubleshooter/Issue.java | 6 +-
gobblin-temporal/build.gradle | 1 +
.../temporal/ddm/activity/GenerateWorkUnits.java | 5 +
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 112 ++++++++++++++---
.../gobblin/temporal/ddm/util/JobStateUtils.java | 18 ++-
...EagerFsDirBackedWorkUnitClaimCheckWorkload.java | 29 ++++-
.../temporal/ddm/work/GenerateWorkUnitsResult.java | 3 +
.../ddm/work/PriorJobStateWUProcessingSpec.java | 3 +-
.../temporal/ddm/work/WorkUnitClaimCheck.java | 6 +-
...kUnitsResult.java => WorkUnitsSizeSummary.java} | 25 ++--
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 34 +++--
.../impl/ProcessWorkUnitsWorkflowImpl.java | 4 +-
.../temporal/dynamic/ScalingDirectiveSource.java | 2 +-
.../gobblin/temporal/dynamic/WorkforcePlan.java | 2 +-
.../workflows/metrics/TemporalEventTimer.java | 6 +-
.../activity/impl/GenerateWorkUnitsImplTest.java | 95 +++++++++++++-
...rFsDirBackedWorkUnitClaimCheckWorkloadTest.java | 109 ++++++++++++++++
gobblin-utility/build.gradle | 1 +
.../org/apache/gobblin/util/JobLauncherUtils.java | 19 ++-
.../org/apache/gobblin/util/WorkUnitSizeInfo.java | 138 +++++++++++++++++++++
.../apache/gobblin/util/JobLauncherUtilsTest.java | 90 ++++++++++++++
.../apache/gobblin/util/WorkUnitSizeInfoTest.java | 104 ++++++++++++++++
gradle/scripts/dependencyDefinitions.gradle | 1 +
24 files changed, 754 insertions(+), 61 deletions(-)
diff --git
a/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
b/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
index bb562bbe81..c3304ccef0 100644
---
a/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
+++
b/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java
@@ -44,7 +44,7 @@ import
org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
/**
- * Collects messages from log4j and converts them into issues that are used in
{@link AutomaticTroubleshooter}.
+ * Collects messages from log4j and converts them into {@link Issue}s used by
the {@link org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter}.
*/
@Slf4j
@ThreadSafe
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
index 7f694b2032..c726931e95 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java
@@ -50,7 +50,7 @@ public class Issue {
*
* It can be used for making programmatic decisions on how to handle and
recover from this issue.
*
- * The code length should be less than {@link Issue.MAX_ISSUE_CODE_LENGTH}
+ * The code length should be less than {@link Issue#MAX_ISSUE_CODE_LENGTH}
* */
private final String code;
@@ -71,14 +71,14 @@ public class Issue {
*
* This is a full name of the class that logged the error or generated the
issue.
*
- * The class name length should be less than {@link
Issue.MAX_CLASSNAME_LENGTH}
+ * The class name length should be less than {@link
Issue#MAX_CLASSNAME_LENGTH}
* */
private final String sourceClass;
/**
* If the issue was generated from an exception, then a full exception class
name should be stored here.
*
- * The class name length should be less than {@link
Issue.MAX_CLASSNAME_LENGTH}
+ * The class name length should be less than {@link
Issue#MAX_CLASSNAME_LENGTH}
*/
private final String exceptionClass;
diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle
index 1832ac909d..fa34245e91 100644
--- a/gobblin-temporal/build.gradle
+++ b/gobblin-temporal/build.gradle
@@ -62,6 +62,7 @@ dependencies {
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}
+ compile externalDependency.tdigest
compile externalDependency."temporal-sdk"
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
index 862b46c40f..a43f803914 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java
@@ -30,6 +30,11 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
/** Activity for generating {@link WorkUnit}s and persisting them to the
{@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
@ActivityInterface
public interface GenerateWorkUnits {
+
+ public static final String NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES =
GenerateWorkUnits.class.getName() + ".numWorkUnitsSizeInfoQuantiles";
+ public static final int DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = 10;
+
+
/** @return the number of {@link WorkUnit}s generated and persisted */
@ActivityMethod
GenerateWorkUnitsResult generateWorkUnits(Properties jobProps,
EventSubmitterContext eventSubmitterContext);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 8344bae6f9..4996c16c1c 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -23,17 +23,23 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import com.google.api.client.util.Lists;
-import com.google.common.io.Closer;
-
import io.temporal.failure.ApplicationFailure;
+import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import com.google.api.client.util.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closer;
+import com.tdunning.math.stats.TDigest;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -41,6 +47,7 @@ import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
@@ -50,6 +57,7 @@ import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -58,6 +66,39 @@ import
org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@Slf4j
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {
+ /** [Internal, implementation class] Size sketch/digest of a collection of
{@link MultiWorkUnit}s */
+ @Data
+ @VisibleForTesting
+ protected static class WorkUnitsSizeDigest {
+ private final long totalSize;
+ /** a top-level work unit has no parent - a root */
+ private final TDigest topLevelWorkUnitsSizeDigest;
+ /** a constituent work unit has no children - a leaf */
+ private final TDigest constituentWorkUnitsSizeDigest;
+
+ public WorkUnitsSizeSummary asSizeSummary(int numQuantiles) {
+ Preconditions.checkArgument(numQuantiles > 0, "numQuantiles must be >
0");
+ final double quantilesWidth = 1.0 / numQuantiles;
+
+ List<Double> topLevelQuantileValues =
getQuantiles(topLevelWorkUnitsSizeDigest, numQuantiles);
+ List<Double> constituentQuantileValues =
getQuantiles(constituentWorkUnitsSizeDigest, numQuantiles);
+ return new WorkUnitsSizeSummary(
+ totalSize,
+ topLevelWorkUnitsSizeDigest.size(),
constituentWorkUnitsSizeDigest.size(),
+ numQuantiles, quantilesWidth,
+ topLevelQuantileValues, constituentQuantileValues);
+ }
+
+ private static List<Double> getQuantiles(TDigest digest, int numQuantiles)
{
+ List<Double> quantileMinSizes = Lists.newArrayList();
+ for (int i = 1; i <= numQuantiles; i++) {
+ quantileMinSizes.add(digest.quantile((i * 1.0) / numQuantiles));
+ }
+ return quantileMinSizes;
+ }
+ }
+
+
@Override
public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
// TODO: decide whether to acquire a job lock (as MR did)!
@@ -80,12 +121,18 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
FileSystem fs = JobStateUtils.openFileSystem(jobState);
fs.mkdirs(workDirRoot);
- Set<String> resourcesToCleanUp = new HashSet<>();
- List<WorkUnit> workUnits =
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState,
eventSubmitterContext, closer, resourcesToCleanUp);
+ Set<String> pathsToCleanUp = new HashSet<>();
+ List<WorkUnit> workUnits =
generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState,
eventSubmitterContext, closer, pathsToCleanUp);
+
+ int numSizeSummaryQuantiles =
getConfiguredNumSizeSummaryQuantiles(jobState);
+ WorkUnitsSizeSummary wuSizeSummary =
digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
+ log.info("Discovered WorkUnits: {}", wuSizeSummary);
+
JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
- JobStateUtils.writeJobState(jobState, workDirRoot, fs);
+ JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION:
the writing of `JobState` after all WUs signifies WU gen+serialization now
complete
- return new GenerateWorkUnitsResult(jobState.getTaskCount(),
resourcesToCleanUp);
+ String sourceClassName = JobStateUtils.getSourceClassName(jobState);
+ return new GenerateWorkUnitsResult(jobState.getTaskCount(),
sourceClassName, wuSizeSummary, pathsToCleanUp);
} catch (ReflectiveOperationException roe) {
String errMsg = "Unable to construct a source for generating workunits
for job " + jobState.getJobId();
log.error(errMsg, roe);
@@ -101,7 +148,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
}
protected List<WorkUnit>
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState,
EventSubmitterContext eventSubmitterContext, Closer closer,
- Set<String> resourcesToCleanUp)
+ Set<String> pathsToCleanUp)
throws ReflectiveOperationException {
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
@@ -127,7 +174,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs,
eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream =
datasetHandlerService.executeHandlers(workUnitStream);
-
resourcesToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
+ pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream));
// initialize writer and converter(s)
// TODO: determine whether registration here is effective, or the
lifecycle of this activity is too brief (as is likely!)
closer.register(WriterInitializerFactory.newInstace(jobState,
handledWorkUnitStream)).initialize();
@@ -151,7 +198,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
}
protected static Set<String> calculateWorkDirsToCleanup(WorkUnitStream
workUnitStream) {
- Set<String> resourcesToCleanUp = new HashSet<>();
+ Set<String> workDirPaths = new HashSet<>();
// Validate every workunit if they have the temp dir props since some
workunits may be commit steps
Iterator<WorkUnit> workUnitIterator = workUnitStream.getWorkUnits();
while (workUnitIterator.hasNext()) {
@@ -159,16 +206,16 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
if (workUnit.isMultiWorkUnit()) {
List<WorkUnit> workUnitList = ((MultiWorkUnit)
workUnit).getWorkUnits();
for (WorkUnit wu : workUnitList) {
-
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
+ // WARNING/TODO: NOT resilient to nested multi-workunits... should
it be?
+ workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu));
}
} else {
-
resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
+
workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit));
}
}
- return resourcesToCleanUp;
+ return workDirPaths;
}
-
private static Set<String>
collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit workUnit) {
Set<String> resourcesToCleanUp = new HashSet<>();
if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
@@ -183,4 +230,41 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
}
return resourcesToCleanUp;
}
+
+ /** @return the {@link WorkUnitsSizeDigest} for `workUnits` */
+ protected static WorkUnitsSizeDigest digestWorkUnitsSize(List<WorkUnit>
workUnits) {
+ AtomicLong totalSize = new AtomicLong(0L);
+ TDigest topLevelWorkUnitsDigest = TDigest.createDigest(100);
+ TDigest constituentWorkUnitsDigest = TDigest.createDigest(100);
+
+ Iterator<WorkUnit> workUnitIterator = workUnits.iterator();
+ while (workUnitIterator.hasNext()) {
+ WorkUnit workUnit = workUnitIterator.next();
+ if (workUnit.isMultiWorkUnit()) {
+ List<WorkUnit> subWorkUnitsList = ((MultiWorkUnit)
workUnit).getWorkUnits();
+ AtomicLong mwuAggSize = new AtomicLong(0L);
+ // WARNING/TODO: NOT resilient to nested multi-workunits... should it
be?
+ subWorkUnitsList.stream().mapToLong(wu ->
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).forEach(wuSize -> {
+ constituentWorkUnitsDigest.add(wuSize);
+ mwuAggSize.addAndGet(wuSize);
+ });
+ totalSize.addAndGet(mwuAggSize.get());
+ topLevelWorkUnitsDigest.add(mwuAggSize.get());
+ } else {
+ long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE,
0);
+ totalSize.addAndGet(wuSize);
+ constituentWorkUnitsDigest.add(wuSize);
+ topLevelWorkUnitsDigest.add(wuSize);
+ }
+ }
+
+ // TODO - decide whether helpful/necessary to `.compress()`
+ topLevelWorkUnitsDigest.compress();
+ constituentWorkUnitsDigest.compress();
+ return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest,
constituentWorkUnitsDigest);
+ }
+
+ public static int getConfiguredNumSizeSummaryQuantiles(State state) {
+ return
state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES,
GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index 37d1f90477..52da7b5299 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -46,6 +46,7 @@ import org.apache.gobblin.runtime.SourceDecorator;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.workunit.WorkUnit;
+import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
@@ -76,9 +77,14 @@ public class JobStateUtils {
return Help.loadFileSystemForUriForce(getFileSystemUri(jobState),
jobState);
}
- /** @return a new instance of {@link Source} identified by {@link
ConfigurationKeys#SOURCE_CLASS_KEY} */
+ /** @return the FQ class name, presumed configured as {@link
ConfigurationKeys#SOURCE_CLASS_KEY} */
+ public static String getSourceClassName(JobState jobState) {
+ return jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY);
+ }
+
+ /** @return a new instance of {@link Source}, identified by {@link
ConfigurationKeys#SOURCE_CLASS_KEY} */
public static Source<?, ?> createSource(JobState jobState) throws
ReflectiveOperationException {
- Class<?> sourceClass =
Class.forName(jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY));
+ Class<?> sourceClass = Class.forName(getSourceClassName(jobState));
log.info("Creating source: '{}'", sourceClass.getName());
Source<?, ?> source = new SourceDecorator<>(
Source.class.cast(sourceClass.newInstance()),
@@ -145,7 +151,10 @@ public class JobStateUtils {
return fs.makeQualified(jobOutputPath);
}
- /** write serialized {@link WorkUnit}s in parallel into files named after
the jobID and task IDs */
+ /**
+ * write serialized {@link WorkUnit}s in parallel into files named to tunnel
{@link org.apache.gobblin.util.WorkUnitSizeInfo}.
+ * {@link EagerFsDirBackedWorkUnitClaimCheckWorkload} (and possibly others)
may later recover such size info.
+ */
public static void writeWorkUnits(List<WorkUnit> workUnits, Path
workDirRootPath, JobState jobState, FileSystem fs)
throws IOException {
String jobId = jobState.getJobId();
@@ -159,7 +168,8 @@ public class JobStateUtils {
JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new
JobLauncherUtils.WorkUnitPathCalculator();
int i = 0;
for (WorkUnit workUnit : workUnits) {
- Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId,
targetDirPath);
+ // tunnel each WU's size info via its filename, for
`EagerFsDirBackedWorkUnitClaimCheckWorkload#extractTunneledWorkUnitSizeInfo`
+ Path workUnitFile =
pathCalculator.calcNextPathWithTunneledSizeInfo(workUnit, jobId, targetDirPath);
if (i++ == 0) {
log.info("Writing work unit file [first of {}]: '{}'",
workUnits.size(), workUnitFile);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
index ac75971d2b..24f6363093 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java
@@ -19,12 +19,19 @@ package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
import java.util.Comparator;
+import java.util.Optional;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.Id;
+import org.apache.gobblin.util.WorkUnitSizeInfo;
/**
@@ -33,6 +40,7 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
*/
@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@lombok.ToString(callSuper = true)
+@Slf4j
public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends
AbstractEagerFsDirBackedWorkload<WorkUnitClaimCheck> {
private EventSubmitterContext eventSubmitterContext;
@@ -43,8 +51,9 @@ public class EagerFsDirBackedWorkUnitClaimCheckWorkload
extends AbstractEagerFsD
@Override
protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) {
- // begin by setting all correlators to empty
- return new WorkUnitClaimCheck("", this.getFileSystemUri(),
fileStatus.getPath().toString(), this.eventSubmitterContext);
+ // begin by setting all correlators to empty string - later we'll
`acknowledgeOrdering()`
+ Path filePath = fileStatus.getPath();
+ return new WorkUnitClaimCheck("", this.getFileSystemUri(),
filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath),
this.eventSubmitterContext);
}
@Override
@@ -58,4 +67,20 @@ public class EagerFsDirBackedWorkUnitClaimCheckWorkload
extends AbstractEagerFsD
// later, after the post-total-ordering indices are know, use each item's
index as its correlator
item.setCorrelator(Integer.toString(index));
}
+
+ /**
+ * @return the {@link WorkUnitSizeInfo}, when encoded in the filename;
otherwise {@link WorkUnitSizeInfo#empty()} when no size info about {@link
WorkUnit}
+ * @see
org.apache.gobblin.util.JobLauncherUtils.WorkUnitPathCalculator#calcNextPathWithTunneledSizeInfo(WorkUnit,
String, Path)
+ */
+ protected static WorkUnitSizeInfo extractTunneledWorkUnitSizeInfo(Path
filePath) {
+ String fileName = filePath.getName();
+ Optional<WorkUnitSizeInfo> optSizeInfo = Optional.empty();
+ try {
+ String maybeEncodedSizeInfo = Id.parse(fileName.substring(0,
fileName.lastIndexOf('.'))).getName(); // strip extension
+ optSizeInfo = WorkUnitSizeInfo.decode(maybeEncodedSizeInfo);
+ } catch (Exception e) { // log, but swallow any `Id.parse` error
+ log.warn("Filename NOT `Id.parse`able: '" + filePath + "' - " +
e.getMessage());
+ }
+ return optSizeInfo.orElse(WorkUnitSizeInfo.empty());
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
index 5f798055e2..f30998ae85 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
@@ -33,7 +33,10 @@ import lombok.RequiredArgsConstructor;
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class GenerateWorkUnitsResult {
+ // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private int generatedWuCount;
+ @NonNull private String sourceClass;
+ @NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
// Resources that the Temporal Job Launcher should clean up for Gobblin
temporary work directory paths in writers
@NonNull private Set<String> workDirPathsToDelete;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
index ff3c9eb6f2..02ba87b3fa 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java
@@ -48,8 +48,7 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@EqualsAndHashCode(callSuper = true) // to prevent findbugs warning - "equals
method overrides equals in superclass and may not be symmetric"
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class PriorJobStateWUProcessingSpec extends WUProcessingSpec {
- @NonNull
- private List<Tag<?>> tags = new ArrayList<>();
+ @NonNull private List<Tag<?>> tags = new ArrayList<>();
@NonNull private String metricsSuffix =
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX;
public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir,
EventSubmitterContext eventSubmitterContext) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index 5cb2064b11..e454c9ceef 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -21,18 +21,19 @@ import java.net.URI;
import org.apache.hadoop.fs.Path;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.WorkUnitSizeInfo;
/**
@@ -47,6 +48,7 @@ public class WorkUnitClaimCheck implements FileSystemApt,
FileSystemJobStateful
@NonNull private String correlator;
@NonNull private URI fileSystemUri;
@NonNull private String workUnitPath;
+ @NonNull private WorkUnitSizeInfo workUnitSizeInfo;
@NonNull private EventSubmitterContext eventSubmitterContext;
@JsonIgnore // (because no-arg method resembles 'java bean property')
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
similarity index 50%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
index 5f798055e2..3ea426c284 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
@@ -17,23 +17,34 @@
package org.apache.gobblin.temporal.ddm.work;
-import java.util.Set;
+import java.util.List;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
/**
- * Data structure representing the result of generating work units, where it
returns the number of generated work units and
- * the folders should be cleaned up after the full job execution is completed
+ * Total size, counts, and size distributions for a collection of {@link
MultiWorkUnit}s, both with regard to top-level (possibly multi) {@link
WorkUnit}s
+ * and individual constituent (purely {@link WorkUnit}s), where:
+ * * a top-level work unit is one with no parent - a root
+ * * a constituent work unit is one with no children - a leaf
+ * @see org.apache.gobblin.util.WorkUnitSizeInfo
*/
@Data
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
-public class GenerateWorkUnitsResult {
- @NonNull private int generatedWuCount;
- // Resources that the Temporal Job Launcher should clean up for Gobblin
temporary work directory paths in writers
- @NonNull private Set<String> workDirPathsToDelete;
+public class WorkUnitsSizeSummary {
+ // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+ @NonNull private long totalSize;
+ @NonNull private long topLevelWorkUnitsCount;
+ @NonNull private long constituentWorkUnitsCount;
+ @NonNull private int quantilesCount;
+ @NonNull private double quantilesWidth;
+ @NonNull private List<Double> topLevelQuantilesMinSizes;
+ @NonNull private List<Double> constituentQuantilesMinSizes;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 2aa2a7e649..1d5a63b736 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -49,6 +49,7 @@ import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -97,14 +98,15 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
- timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit();
- EventTimer timer = timerFactory.createJobTimer();
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); //
update GaaS: `TimingEvent.JOB_START_TIME`
+ EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> generateWorkUnitResultsOpt =
Optional.empty();
WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
boolean isSuccessful = false;
try {
generateWorkUnitResultsOpt =
Optional.of(genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext));
- int numWUsGenerated =
generateWorkUnitResultsOpt.get().getGeneratedWuCount();
+ WorkUnitsSizeSummary wuSizeSummary =
generateWorkUnitResultsOpt.get().getWorkUnitsSizeSummary();
+ int numWUsGenerated =
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
int numWUsCommitted = 0;
CommitStats commitStats = CommitStats.createEmpty();
if (numWUsGenerated > 0) {
@@ -112,18 +114,16 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
commitStats = processWUsWorkflow.process(wuSpec);
numWUsCommitted = commitStats.getNumCommittedWorkUnits();
}
- timer.stop();
+ jobSuccessTimer.stop();
isSuccessful = true;
return new ExecGobblinStats(numWUsGenerated, numWUsCommitted,
jobProps.getProperty(Help.USER_TO_PROXY_KEY),
commitStats.getDatasetStats());
} catch (Exception e) {
// Emit a failed GobblinTrackingEvent to record job failures
- timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); //
update GaaS: `ExecutionStatus.FAILED`; `TimingEvent.JOB_END_TIME`
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
- e.getClass().getName(),
- e
- );
+ e.getClass().getName(), e);
} finally {
// TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid
flight
try {
@@ -138,9 +138,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
if (isSuccessful) {
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed cleaning Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
- e.getClass().getName(),
- e
- );
+ e.getClass().getName(), e);
}
log.error("Failed to cleanup work dirs", e);
}
@@ -210,4 +208,18 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
}
return resultSet;
}
+
+ /**
+ * Historical practice counted {@link
org.apache.gobblin.source.workunit.WorkUnit}s with {@link int} (see e.g. {@link
JobState#getTaskCount()}).
+ * Updated counting now uses {@link long}, although much code still presumes
{@link int}. While we don't presently anticipate jobs exceeding 2 billion
+ * `WorkUnit`s, if it were ever to happen, this method will fail-fast to
flag the need to address.
+ * @throws {@link IllegalArgumentException} if the count exceeds {@link
Integer#MAX_VALUE}
+ */
+ protected static int
safelyCastNumConstituentWorkUnitsOrThrow(WorkUnitsSizeSummary wuSizeSummary) {
+ long n = wuSizeSummary.getConstituentWorkUnitsCount();
+ if (n > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("Too many constituent WorkUnits (" +
n + ") - exceeds `Integer.MAX_VALUE`!");
+ }
+ return (int) n;
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index ccd2979491..6402e473bf 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -66,8 +66,8 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
try {
jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec));
} catch (Exception e) {
- log.error("Exception occured during loading jobState", e);
- throw new RuntimeException("Exception occured during loading jobState",
e);
+ log.error("Error loading jobState", e);
+ throw new RuntimeException("Error loading jobState", e);
}
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
index 1b0f79e78d..764725dd90 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
@@ -22,7 +22,7 @@ import java.util.List;
/** An opaque source of {@link
org.apache.gobblin.temporal.dynamic.ScalingDirective}s */
-public interface ScalingDirectiveSource extends Cloneable {
+public interface ScalingDirectiveSource {
/** @return {@link ScalingDirective}s - an impl. may choose to return all
known directives or to give only newer directives than previously returned */
List<ScalingDirective> getScalingDirectives() throws IOException;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
index dde5555644..2a070e2ed9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
@@ -167,7 +167,7 @@ public class WorkforcePlan {
return profiles.getOrThrow(profileName);
}
- /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it
should NEVER {@link WorkforceProfiles.UnknownProfileException} */
+ /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it
should NEVER throw {@link WorkforceProfiles.UnknownProfileException} */
@VisibleForTesting
WorkerProfile peepBaselineProfile() throws
WorkforceProfiles.UnknownProfileException {
return profiles.getOrThrow(WorkforceProfiles.BASELINE_NAME);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index c9d9e940ec..e1ac601986 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -30,7 +30,7 @@ import org.apache.gobblin.metrics.event.TimingEvent;
/**
- * Boiler plate for tracking elapsed time of events that is compatible with
{@link Workflow}
+ * Boilerplate for tracking elapsed time of events that is compatible with
{@link Workflow}
* by using activities to record time
*
* This class is very similar to {@link TimingEvent} but uses {@link Workflow}
compatible APIs. It's possible to refactor
@@ -106,9 +106,9 @@ public class TemporalEventTimer implements EventTimer {
* @return a timer that emits an event at the beginning of the job and a
completion event ends at the end of the job
*/
public TemporalEventTimer createJobTimer() {
- TemporalEventTimer startTimer =
create(TimingEvent.LauncherTimings.JOB_START);
+ TemporalEventTimer startTimer =
create(TimingEvent.LauncherTimings.JOB_START); // update GaaS:
`ExecutionStatus.RUNNING`
startTimer.stop(Instant.EPOCH); // Emit start job event containing a
stub end time
- // GaaS job status monitor tracks for SUCCEEDED events or FAILED events
for job completion
+ // [upon `.stop()`] update GaaS: `ExecutionStatus.RUNNING`,
`TimingEvent.JOB_END_TIME`:
return create(TimingEvent.LauncherTimings.JOB_SUCCEEDED,
startTimer.startTime);
}
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
index 86c5ac12de..8c94783a7d 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java
@@ -24,10 +24,12 @@ import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.Test;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
public class GenerateWorkUnitsImplTest {
@@ -36,7 +38,7 @@ public class GenerateWorkUnitsImplTest {
public void testFetchesWorkDirsFromWorkUnits() {
List<WorkUnit> workUnits = new ArrayList<>();
for (int i = 0; i < 5; i++) {
- WorkUnit workUnit = new WorkUnit();
+ WorkUnit workUnit = WorkUnit.createEmpty();
workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i);
workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i);
workUnit.setProp("qualitychecker.row.err.file",
"/tmp/jobId/row-err/file");
@@ -54,9 +56,9 @@ public class GenerateWorkUnitsImplTest {
public void testFetchesWorkDirsFromMultiWorkUnits() {
List<WorkUnit> workUnits = new ArrayList<>();
for (int i = 0; i < 5; i++) {
- MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+ MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
for (int j = 0; j < 3; j++) {
- WorkUnit workUnit = new WorkUnit();
+ WorkUnit workUnit = WorkUnit.createEmpty();
workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/");
workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/");
workUnit.setProp("qualitychecker.row.err.file",
"/tmp/jobId/row-err/file");
@@ -76,9 +78,9 @@ public class GenerateWorkUnitsImplTest {
public void testFetchesUniqueWorkDirsFromMultiWorkUnits() {
List<WorkUnit> workUnits = new ArrayList<>();
for (int i = 0; i < 5; i++) {
- MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
+ MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
for (int j = 0; j < 3; j++) {
- WorkUnit workUnit = new WorkUnit();
+ WorkUnit workUnit = WorkUnit.createEmpty();
// Each MWU will have its own staging and output dir
workUnit.setProp("writer.staging.dir", "/tmp/jobId/" + i +
"/task-staging/");
workUnit.setProp("writer.output.dir", "/tmp/jobId/" + i +
"task-output/");
@@ -94,4 +96,87 @@ public class GenerateWorkUnitsImplTest {
Set<String> output =
GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream);
Assert.assertEquals(output.size(), 11);
}
+
+ @Test
+ public void testDigestWorkUnitsSize() {
+ int numSingleWorkUnits = 5;
+ int numMultiWorkUnits = 15;
+ long singleWorkUnitSizeFactor = 100L;
+ long multiWorkUnitSizeFactor = 70L;
+ List<WorkUnit> workUnits = new ArrayList<>();
+
+ // configure size of non-multi-work-units (increments of
`singleWorkUnitSizeFactor`, starting from 0)
+ for (int i = 0; i < numSingleWorkUnits; i++) {
+ workUnits.add(createWorkUnitOfSize(i * singleWorkUnitSizeFactor));
+ }
+
+ // configure size of multi-work-units, each containing between 1 and 4
sub-work-unit children
+ for (int i = 0; i < numMultiWorkUnits; i++) {
+ MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
+ int subWorkUnitCount = 1 + (i % 4); // 1 to 4
+ for (int j = 0; j < subWorkUnitCount; j++) {
+ multiWorkUnit.addWorkUnit(createWorkUnitOfSize((j + 1) *
multiWorkUnitSizeFactor));
+ }
+ workUnits.add(multiWorkUnit);
+ }
+
+ // calc expectations
+ long expectedTotalSize = 0L;
+ int expectedNumTopLevelWorkUnits = numSingleWorkUnits + numMultiWorkUnits;
+ int expectedNumConstituentWorkUnits = numSingleWorkUnits;
+ for (int i = 0; i < numSingleWorkUnits; i++) {
+ expectedTotalSize += i * singleWorkUnitSizeFactor;
+ }
+ for (int i = 0; i < numMultiWorkUnits; i++) {
+ int numSubWorkUnitsThisMWU = 1 + (i % 4);
+ expectedNumConstituentWorkUnits += numSubWorkUnitsThisMWU;
+ for (int j = 0; j < numSubWorkUnitsThisMWU; j++) {
+ expectedTotalSize += (j + 1) * multiWorkUnitSizeFactor;
+ }
+ }
+
+ GenerateWorkUnitsImpl.WorkUnitsSizeDigest wuSizeDigest =
GenerateWorkUnitsImpl.digestWorkUnitsSize(workUnits);
+
+ Assert.assertEquals(wuSizeDigest.getTotalSize(), expectedTotalSize);
+ Assert.assertEquals(wuSizeDigest.getTopLevelWorkUnitsSizeDigest().size(),
expectedNumTopLevelWorkUnits);
+
Assert.assertEquals(wuSizeDigest.getConstituentWorkUnitsSizeDigest().size(),
expectedNumConstituentWorkUnits);
+
+ int numQuantilesDesired = expectedNumTopLevelWorkUnits; // for simpler
math during quantile verification (below)
+ WorkUnitsSizeSummary wuSizeInfo =
wuSizeDigest.asSizeSummary(numQuantilesDesired);
+ Assert.assertEquals(wuSizeInfo.getTotalSize(), expectedTotalSize);
+ Assert.assertEquals(wuSizeInfo.getTopLevelWorkUnitsCount(),
expectedNumTopLevelWorkUnits);
+ Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsCount(),
expectedNumConstituentWorkUnits);
+ Assert.assertEquals(wuSizeInfo.getQuantilesCount(), numQuantilesDesired);
+ Assert.assertEquals(wuSizeInfo.getQuantilesWidth(), 1.0 /
expectedNumTopLevelWorkUnits);
+ Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().size(),
numQuantilesDesired); // same as `asSizeInfo` param
+ Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().size(),
numQuantilesDesired); // same as `asSizeInfo` param
+
+ // expected sizes for (n=5) top-level non-multi-WUs: (1x) 0, (1x) 100,
(1x) 200, (1x) 300, (1x) 400
+ // expected sizes for (n=15) top-level multi-WUs: [a] (4x) 70; [b] (4x)
210 (= 70+140); [c] (4x) 420 (= 70+140+210); [d] (3x) 700 (= 70+140+210+280)
+ Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().toArray(),
+ new Double[]{
+ 70.0, 70.0, 70.0, 70.0, // 4x MWU [a]
+ 100.0, 200.0, // non-MWU [2, 3]
+ 210.0, 210.0, 210.0, 210.0, // 4x MWU [b]
+ 300.0, 400.0, // non-MWU [4, 5]
+ 420.0, 420.0, 420.0, 420.0, // 4x MWU [c]
+ 700.0, 700.0, 700.0, 700.0 }); // 3x MWU [d] + "100-percentile"
(all WUs)
+
+ // expected sizes for (n=36) constituents from multi-WUs: [m] (15x =
4+4+4+3) 70; [n] (11x = 4+4+3) 140; [o] (7x = 4+3) 210; [p] (3x) 280
+ Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().toArray(),
+ new Double[]{
+ 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, // (per 15x MWU [m]) -
15/41 * 20 ~ 7.3
+ 100.0, // non-MWU [2]
+ 140.0, 140.0, 140.0, 140.0, 140.0, // (per 11x MWU [n]) -
(15+1+11/41) * 20 ~ 13.2 | 13.2 - 8 = 5.2
+ 200.0, // non-MWU [3]
+ 210.0, 210.0, 210.0, // (per 7x MWU [o]) - (15+1+11+1+7/41) * 20 ~
17.0 | 17.0 - 14 = 3
+ 280.0, 280.0, // 3x MWU [p] ... (15+1+11+1+7+3/41) * 20 ~ 18.5 |
18.5 - 17 = 2
+ 400.0 }); // with only one 20-quantile remaining, non-MWU [5]
completes the "100-percentile" (all WUs)
+ }
+
+ public static WorkUnit createWorkUnitOfSize(long size) {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);
+ return workUnit;
+ }
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
new file mode 100644
index 0000000000..6df60857eb
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.mockito.Mockito;
+import org.mockito.MockedStatic;
+
+import org.apache.gobblin.temporal.util.nesting.work.Workload;
+import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.WorkUnitSizeInfo;
+
+
+/** Tests for {@link EagerFsDirBackedWorkUnitClaimCheckWorkload} */
+public class EagerFsDirBackedWorkUnitClaimCheckWorkloadTest {
+
+ private EagerFsDirBackedWorkUnitClaimCheckWorkload workload;
+ private EventSubmitterContext eventSubmitterContext;
+ private FileSystem mockFileSystem;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ URI fileSystemUri = new URI("hdfs://localhost:9000");
+ String hdfsDir = "/test/dir";
+ eventSubmitterContext = Mockito.mock(EventSubmitterContext.class);
+ workload = Mockito.spy(new
EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir,
eventSubmitterContext));
+ mockFileSystem = Mockito.mock(FileSystem.class);
+
+ MockedStatic<HadoopUtils> mockedHadoopUtils =
Mockito.mockStatic(HadoopUtils.class);
+ mockedHadoopUtils.when(() -> HadoopUtils.getFileSystem(Mockito.any(),
Mockito.any())).thenReturn(mockFileSystem);
+ }
+
+ @Test
+ public void testExtractWorkUnitSizeInfo() throws Exception {
+ // mock `FileStatus` having an encoded WorkUnitSizeInfo as filename
+ List<FileStatus> fileStatuses = Arrays.asList(
+
createMockFileStatus("/test/dir/multitask_n=3-total=100-median=100.0-mean=100.0-stddev=0.0_0.mwu"),
+
createMockFileStatus("/test/dir/multitask_n=2-total=200-median=100.0-mean=100.0-stddev=0.0_1.mwu"),
+
createMockFileStatus("/test/dir/task_n=1-total=300-median=100.0-mean=100.0-stddev=0.0_5.wu"),
+
createMockFileStatus("/test/dir/multitask_n=4-total=400-median=100.0-mean=100.0-stddev=0.0_30.mwu"),
+
createMockFileStatus("/test/dir/task_n=1-total=500-median=100.0-mean=100.0-stddev=0.0_2.wu")
+ );
+
+ Mockito.when(mockFileSystem.listStatus(Mockito.any(Path.class),
Mockito.any(PathFilter.class)))
+ .thenReturn(fileStatuses.toArray(new FileStatus[0]));
+
+ Optional<Workload.WorkSpan<WorkUnitClaimCheck>> span = workload.getSpan(0,
5);
+ Assert.assertTrue(span.isPresent());
+
+ Workload.WorkSpan<WorkUnitClaimCheck> workSpan = span.get();
+ Assert.assertEquals(workSpan.getNumElems(), fileStatuses.size());
+
+ Iterable<WorkUnitClaimCheck> workSpanIterable = () -> workSpan;
+ List<WorkUnitClaimCheck> wuClaimChecks =
StreamSupport.stream(workSpanIterable.spliterator(), false)
+ .collect(Collectors.toList());
+
+ // note: since ordering is based on filename, it will not be the same as
`fileStatuses` above
+ for (WorkUnitClaimCheck workUnitClaimCheck : wuClaimChecks) {
+ WorkUnitSizeInfo sizeInfo = workUnitClaimCheck.getWorkUnitSizeInfo();
+ Assert.assertNotNull(sizeInfo);
+ int n = sizeInfo.getNumConstituents();
+ long expectedTotalSize = (n == 1 &&
workUnitClaimCheck.getWorkUnitPath().endsWith("_5.wu")) ? 300 :
+ n == 1 ? 500 :
+ n == 2 ? 200 :
+ n == 3 ? 100 :
+ n == 4 ? 400 :
+ -1; // unexpected (sentinel)
+ Assert.assertEquals(sizeInfo.getTotalSize(), expectedTotalSize);
+ }
+ }
+
+ private static FileStatus createMockFileStatus(String path) {
+ FileStatus fileStatus = Mockito.mock(FileStatus.class);
+ Path filePath = new Path(path);
+ Mockito.when(fileStatus.getPath()).thenReturn(filePath);
+ return fileStatus;
+ }
+}
+
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index b916f214cd..80e73afcfd 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -40,6 +40,7 @@ dependencies {
compile externalDependency.metricsCore
compile externalDependency.guavaretrying
compile externalDependency.guice
+ compile externalDependency.tdigest
compile externalDependency.typesafeConfig
compile externalDependency.commonsPool
compile externalDependency.hadoopClientCommon
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index f70f11f752..d1f97681ee 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
+import lombok.extern.slf4j.Slf4j;
+
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
@@ -40,8 +42,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -66,13 +66,26 @@ public class JobLauncherUtils {
public static class WorkUnitPathCalculator {
private final AtomicInteger nextMultiWorkUnitTaskId = new AtomicInteger(0);
- // Serialize each work unit into a file named after the task ID
+ /** @return `Path` beneath `basePath` to serialize `workUnit`, with file
named after the task ID (itself named after the job ID) */
public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) {
String workUnitFileName = workUnit.isMultiWorkUnit()
? JobLauncherUtils.newMultiTaskId(jobId,
nextMultiWorkUnitTaskId.getAndIncrement()) +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
: workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
return new Path(basePath, workUnitFileName);
}
+
+ /**
+ * Calc where to serialize {@link WorkUnit}, using a filename that tunnels
{@link WorkUnitSizeInfo} (vs. repeating the task/job ID, as was legacy practice)
+ * This provides a direct and simple way to tunnel limited, but crucial,
metadata that incurs no additional access cost (nor adds FS load).
+ * @return `Path` beneath `basePath` to serialize `workUnit`
+ */
+ public Path calcNextPathWithTunneledSizeInfo(WorkUnit workUnit, String
jobId, Path basePath) {
+ String encodedSizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit).encode();
+ String workUnitFileName = workUnit.isMultiWorkUnit()
+ ? Id.MultiTask.create(encodedSizeInfo,
nextMultiWorkUnitTaskId.getAndIncrement()) +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION
+ : Id.Task.create(encodedSizeInfo,
workUnit.getPropAsInt(ConfigurationKeys.TASK_KEY_KEY)) +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+ return new Path(basePath, workUnitFileName);
+ }
}
/**
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
new file mode 100644
index 0000000000..107590407a
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.tdunning.math.stats.TDigest;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Bare-bones size information about a {@link WorkUnit}, possibly a {@link
MultiWorkUnit}, where a constituent work unit is one with no children - a leaf.
+ *
+ * Measurement currently requires the `WorkUnit` to define {@link
ServiceConfigKeys#WORK_UNIT_SIZE}, otherwise sizes will be 0 with merely the
count of
+ * constituent `WorkUnits`. For the most part, at present, that key is
supplied only by {@link org.apache.gobblin.data.management.copy.CopySource}.
+ * Nonetheless, the "contract" for any {@link
org.apache.gobblin.source.Source} is both clear and reasonable: just add "size"
to your `WorkUnit`s to
+ * participate.
+ *
+ * Some sources might count bytes, others num records, possibly with those
size-weighted; and of course not all sources extract a definite
+ * amount of data, known up front. In such cases, the {@link
#numConstituents} (aka. parallelism potential) may be most informative.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WorkUnitSizeInfo {
+ // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+ @NonNull private int numConstituents;
+ @NonNull private long totalSize;
+ @NonNull private double medianSize;
+ @NonNull private double meanSize;
+ @NonNull private double stddevSize;
+
+ /** @return the 'zero' {@link WorkUnitSizeInfo} */
+ public static WorkUnitSizeInfo empty() {
+ return new WorkUnitSizeInfo(0, 0, 0.0, 0.0, 0.0);
+ }
+
+ /**
+ * convenience factory to measure a {@link WorkUnit} - preferable to direct
ctor call
+ * @returns {@link #empty()} when the `WorkUnit` is not measurable by
defining {@link ServiceConfigKeys#WORK_UNIT_SIZE}
+ */
+ public static WorkUnitSizeInfo forWorkUnit(WorkUnit workUnit) {
+ // NOTE: redundant `instanceof` merely to appease FindBugs -
"Unchecked/unconfirmed cast ..."
+ if (!workUnit.isMultiWorkUnit() || !(workUnit instanceof MultiWorkUnit)) {
+ long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE,
0);
+ return new WorkUnitSizeInfo(1, wuSize, wuSize, wuSize, 0.0);
+ } else {
+ // WARNING/TODO: NOT resilient to nested multi-workunits... should it be?
+ List<WorkUnit> subWorkUnitsList = ((MultiWorkUnit)
workUnit).getWorkUnits();
+ if (subWorkUnitsList.isEmpty()) {
+ return WorkUnitSizeInfo.empty();
+ }
+ int n = subWorkUnitsList.size();
+ TDigest constituentWorkUnitsDigest = TDigest.createDigest(100);
+ AtomicLong totalSize = new AtomicLong(0L);
+ List<Long> subWorkUnitSizes = subWorkUnitsList.stream().mapToLong(wu ->
wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0))
+ .boxed().collect(Collectors.toList());
+ subWorkUnitSizes.forEach(wuSize -> {
+ constituentWorkUnitsDigest.add(wuSize);
+ totalSize.addAndGet(wuSize);
+ });
+ double mean = totalSize.get() * 1.0 / n;
+ double variance = subWorkUnitSizes.stream().mapToDouble(wuSize -> {
+ double meanDiff = wuSize - mean;
+ return meanDiff * meanDiff;
+ }).sum() / n;
+ double stddev = Math.sqrt(variance);
+ double median = constituentWorkUnitsDigest.quantile(0.5);
+ return new WorkUnitSizeInfo(n, totalSize.get(), median, mean, stddev);
+ }
+ }
+
+ /**
+ * @return stringified, human-readable prop+value encoding that may be
inverted with {@link #decode(String)}
+ *
+ * NOTE: The resulting encoded form will be between 42 and 117 chars:
+ * - presuming - a 32-bit int (max 10 digits), a 64-bit long (max 19
digits), and a 64-bit double (max 19 digits)
+ * * 86 digits maximum for the values - 1*int + 1*long + 3*double = 10 +
19 + 3*19 = 80
+ * * 11 digits minimum for the values - 1*int + 1*long + 3*double = 1 + 1
+ 3*3 = 11
+ * * 22 digits for the names [1+5+6+4+6]
+ * * 9 digits for additional syntax [5+4]
+ * = 117 digits (max)
+ * = 42 digits (min)
+ */
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public String encode() {
+ return String.format("n=%d-total=%d-median=%.2f-mean=%.2f-stddev=%.2f",
+ numConstituents, totalSize, medianSize, meanSize, stddevSize);
+ }
+
+ private static final String DECODING_REGEX =
"^n=(\\d+)-total=(\\d+)-median=(\\d+(?:\\.\\d+)?)-mean=(\\d+(?:\\.\\d+)?)-stddev=(\\d+(?:\\.\\d+)?)$";
+ private static final Pattern decodingPattern =
Pattern.compile(DECODING_REGEX);
+
+ /** @return the parsed size info, when `encoded` is in {@link
WorkUnitSizeInfo#encode()}-compatible form; otherwise {@link Optional#empty()}
*/
+ public static Optional<WorkUnitSizeInfo> decode(String encoded) {
+ Matcher decoding = decodingPattern.matcher(encoded);
+ if (!decoding.matches()) {
+ return Optional.empty();
+ } else {
+ return Optional.of(new WorkUnitSizeInfo(
+ Integer.parseInt(decoding.group(1)),
+ Long.parseLong(decoding.group(2)),
+ Double.parseDouble(decoding.group(3)),
+ Double.parseDouble(decoding.group(4)),
+ Double.parseDouble(decoding.group(5))
+ ));
+ }
+ }
+}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
index c11fcc460c..9bdae9ffea 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java
@@ -26,12 +26,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.LoggerFactory;
+
import org.testng.Assert;
import org.testng.annotations.Test;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.Extract.TableType;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -48,6 +50,8 @@ public class JobLauncherUtilsTest {
private static final String JOB_NAME = "foo";
private static final Pattern PATTERN = Pattern.compile("job_" + JOB_NAME +
"_\\d+");
+ private final Path wuBasePath = new Path("/abs/base/path");
+ private final String wuPathJobId = Id.Task.create("test_wu_path_JobId",
6).toString();
private String jobId;
@Test
@@ -87,6 +91,92 @@ public class JobLauncherUtilsTest {
Assert.assertEquals(JobLauncherUtils.flattenWorkUnits(workUnitsAndMultiWorkUnits).size(),
9);
}
+ @Test
+ public void testCalcNextPathSingleWorkUnit() {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task1");
+
+ JobLauncherUtils.WorkUnitPathCalculator pathCalc = new
JobLauncherUtils.WorkUnitPathCalculator();
+ Path resultPath = pathCalc.calcNextPath(workUnit, wuPathJobId, wuBasePath);
+
+ Assert.assertEquals(resultPath, new Path(wuBasePath, "task1" +
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION));
+ }
+
+ @Test
+ public void testCalcNextPathWithTunneledSizeInfoSingleWorkUnit() {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task1");
+ workUnit.setProp(ConfigurationKeys.TASK_KEY_KEY, "789");
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, "120");
+
+ JobLauncherUtils.WorkUnitPathCalculator pathCalc = new
JobLauncherUtils.WorkUnitPathCalculator();
+ Path resultPath = pathCalc.calcNextPathWithTunneledSizeInfo(workUnit,
jobId, wuBasePath);
+
+ String encodedSizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit).encode();
+ String expectedFileName = Id.Task.PREFIX + "_" + encodedSizeInfo + "_789"
+ JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
+ Assert.assertEquals(resultPath, new Path(wuBasePath, expectedFileName));
+ }
+
+ @Test
+ public void testCalcNextPathMultiWorkUnit() {
+ MultiWorkUnit multiWorkUnitA = MultiWorkUnit.createEmpty();
+ for (int i = 0; i < 5; i++) {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_A" +
i);
+ multiWorkUnitA.addWorkUnit(workUnit);
+ }
+ MultiWorkUnit multiWorkUnitB = MultiWorkUnit.createEmpty();
+ for (int i = 0; i < 3; i++) {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_B" +
i);
+ multiWorkUnitB.addWorkUnit(workUnit);
+ }
+
+ JobLauncherUtils.WorkUnitPathCalculator pathCalc = new
JobLauncherUtils.WorkUnitPathCalculator();
+ Path resultPathA0 = pathCalc.calcNextPath(multiWorkUnitA, wuPathJobId,
wuBasePath);
+ Path resultPathB1 = pathCalc.calcNextPath(multiWorkUnitB, wuPathJobId,
wuBasePath);
+ Path resultPathB2 = pathCalc.calcNextPath(multiWorkUnitB, wuPathJobId,
wuBasePath);
+
+ String expectedFileNameA0 = Id.MultiTask.PREFIX + "_" +
wuPathJobId.replace("task_", "") + "_0" +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+ String expectedFileNameB1 = Id.MultiTask.PREFIX + "_" +
wuPathJobId.replace("task_", "") + "_1" +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+ String expectedFileNameB2 = Id.MultiTask.PREFIX + "_" +
wuPathJobId.replace("task_", "") + "_2" +
JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+ Assert.assertEquals(resultPathA0, new Path(wuBasePath,
expectedFileNameA0));
+ Assert.assertEquals(resultPathB1, new Path(wuBasePath,
expectedFileNameB1));
+ Assert.assertEquals(resultPathB2, new Path(wuBasePath,
expectedFileNameB2));
+ }
+
+ @Test
+ public void testCalcNextPathWithTunneledSizeInfoMultiWorkUnit() {
+ MultiWorkUnit multiWorkUnitA = MultiWorkUnit.createEmpty();
+ for (int i = 0; i < 5; i++) {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_A" +
i);
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, String.valueOf((i +
1) * 75));
+ multiWorkUnitA.addWorkUnit(workUnit);
+ }
+ MultiWorkUnit multiWorkUnitB = MultiWorkUnit.createEmpty();
+ for (int i = 0; i < 3; i++) {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_B" +
i);
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, String.valueOf((i +
1) * 66));
+ multiWorkUnitB.addWorkUnit(workUnit);
+ }
+
+ JobLauncherUtils.WorkUnitPathCalculator pathCalc = new
JobLauncherUtils.WorkUnitPathCalculator();
+ Path resultPathA0 =
pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitA, wuPathJobId,
wuBasePath);
+ Path resultPathB1 =
pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitB, wuPathJobId,
wuBasePath);
+ Path resultPathB2 =
pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitB, wuPathJobId,
wuBasePath);
+
+ String encodedSizeInfoA =
WorkUnitSizeInfo.forWorkUnit(multiWorkUnitA).encode();
+ String encodedSizeInfoB =
WorkUnitSizeInfo.forWorkUnit(multiWorkUnitB).encode();
+ String expectedFileNameA0 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoA +
"_0" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+ String expectedFileNameB1 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoB +
"_1" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+ String expectedFileNameB2 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoB +
"_2" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
+ Assert.assertEquals(resultPathA0, new Path(wuBasePath,
expectedFileNameA0));
+ Assert.assertEquals(resultPathB1, new Path(wuBasePath,
expectedFileNameB1));
+ Assert.assertEquals(resultPathB2, new Path(wuBasePath,
expectedFileNameB2));
+ }
+
@Test
public void testDeleteStagingData() throws IOException {
FileSystem fs = FileSystem.getLocal(new Configuration());
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java
new file mode 100644
index 0000000000..ab2b689ea8
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import java.util.Optional;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+/** Tests for {@link WorkUnitSizeInfo}. */
+public class WorkUnitSizeInfoTest {
+
+ @Test
+ public void testMultiWorkUnitSizeInfo() {
+ List<WorkUnit> workUnits = new ArrayList<>();
+ long totalSize = 0;
+ for (int i = 1; i <= 20; i++) {
+ long size = i * 100L;
+ totalSize += size;
+ WorkUnit wu = WorkUnit.createEmpty();
+ wu.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);
+ workUnits.add(wu);
+ }
+ MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
+ multiWorkUnit.addWorkUnits(workUnits);
+
+ double expectedMean = totalSize * 1.0 / workUnits.size();
+
+ WorkUnitSizeInfo sizeInfo = WorkUnitSizeInfo.forWorkUnit(multiWorkUnit);
+
+ Assert.assertEquals(sizeInfo.getNumConstituents(), 20);
+ Assert.assertEquals(sizeInfo.getTotalSize(), totalSize);
+ Assert.assertEquals(sizeInfo.getMedianSize(), 1100.0, 0.1);
+ Assert.assertEquals(sizeInfo.getMeanSize(), expectedMean, 0.1);
+ Assert.assertEquals(sizeInfo.getStddevSize(), 576.628, 0.1);
+ }
+
+ @Test
+ public void testSingleWorkUnitSizeInfo() {
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, 5432L);
+
+ WorkUnitSizeInfo sizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit);
+
+ Assert.assertEquals(sizeInfo.getNumConstituents(), 1);
+ Assert.assertEquals(sizeInfo.getTotalSize(), 5432L);
+ Assert.assertEquals(sizeInfo.getMedianSize(), 5432.0, 0.1);
+ Assert.assertEquals(sizeInfo.getMeanSize(), 5432.0, 0.1);
+ Assert.assertEquals(sizeInfo.getStddevSize(), 0.0, 0.1);
+ }
+
+ @Test
+ public void testEncodeThenDecodeRoundTrip() {
+ WorkUnitSizeInfo original = new WorkUnitSizeInfo(20, 21000L, 1100.0,
1050.0, 576.628);
+ String encoded = original.encode();
+
+ Assert.assertEquals(encoded,
"n=20-total=21000-median=1100.00-mean=1050.00-stddev=576.63");
+
+ Optional<WorkUnitSizeInfo> optDecoded = WorkUnitSizeInfo.decode(encoded);
+ Assert.assertTrue(optDecoded.isPresent());
+ WorkUnitSizeInfo decoded = optDecoded.get();
+ Assert.assertEquals(decoded.getNumConstituents(),
original.getNumConstituents());
+ Assert.assertEquals(decoded.getTotalSize(), original.getTotalSize());
+ Assert.assertEquals(decoded.getMedianSize(), original.getMedianSize(),
0.1);
+ Assert.assertEquals(decoded.getMeanSize(), original.getMeanSize(), 0.1);
+ Assert.assertEquals(decoded.getStddevSize(), original.getStddevSize(),
0.1);
+ }
+
+ @Test
+ public void testDecodeFromIntFormattedDoubles() {
+ String encoded = "n=20-total=12345-median=1111-mean=617-stddev=543";
+
+ Optional<WorkUnitSizeInfo> optDecoded = WorkUnitSizeInfo.decode(encoded);
+ Assert.assertTrue(optDecoded.isPresent());
+ WorkUnitSizeInfo decoded = optDecoded.get();
+ Assert.assertEquals(decoded.getNumConstituents(), 20);
+ Assert.assertEquals(decoded.getTotalSize(), 12345L);
+ Assert.assertEquals(decoded.getMedianSize(), 1111.0);
+ Assert.assertEquals(decoded.getMeanSize(), 617.0);
+ Assert.assertEquals(decoded.getStddevSize(), 543.0);
+ }
+}
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 966ef824d7..e4b328c5e1 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -220,6 +220,7 @@ ext.externalDependency = [
"org.slf4j:slf4j-log4j12:" + slf4jVersion
],
"postgresConnector": "org.postgresql:postgresql:42.1.4",
+ "tdigest": "com.tdunning:t-digest:3.3",
"testContainers": "org.testcontainers:testcontainers:1.17.3",
"testContainersMysql": "org.testcontainers:mysql:1.17.3",
"xz": "org.tukaani:xz:1.8"