Repository: incubator-gobblin Updated Branches: refs/heads/master 249d5a19c -> 1c2d30b06
[GOBBLIN-492] Refactor LoopingDatasetFinderSource to make it extensible Closes #2363 from autumnust/refactorIteratorFactory Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1c2d30b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1c2d30b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1c2d30b0 Branch: refs/heads/master Commit: 1c2d30b06752da7d6332631498feeeb75d424d80 Parents: 249d5a1 Author: Lei Sun <[email protected]> Authored: Tue May 15 14:41:51 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue May 15 14:41:51 2018 -0700 ---------------------------------------------------------------------- .../source/LoopingDatasetFinderSource.java | 74 +++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c2d30b0/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java index 50b24de..8e39bd2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java @@ -30,7 +30,6 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; -import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; @@ -44,7 +43,6 @@ import org.apache.gobblin.runtime.task.NoopTask; import org.apache.gobblin.source.workunit.BasicWorkUnitStream; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.source.workunit.WorkUnitStream; -import org.apache.gobblin.util.ConfigUtils; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -93,9 +91,8 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour public WorkUnitStream getWorkunitStream(SourceState state, boolean isDatasetStateStoreEnabled) { this.isDatasetStateStoreEnabled = isDatasetStateStoreEnabled; try { - int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN); - Preconditions.checkArgument(maxWorkUnits > 0, "Max work units must be greater than 0!"); - Config config = ConfigUtils.propertiesToConfig(state.getProperties()); + int maximumWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN); + Preconditions.checkArgument(maximumWorkUnits > 0, "Max work units must be greater than 0!"); List<WorkUnitState> previousWorkUnitStates = (this.isDatasetStateStoreEnabled) ? state .getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN) @@ -109,42 +106,50 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour } } + IterableDatasetFinder datasetsFinder = createDatasetsFinder(state); + + Stream<Dataset> datasetStream = + datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator); + datasetStream = sortStreamLexicographically(datasetStream); + String previousDatasetUrnWatermark = null; String previousPartitionUrnWatermark = null; if (maxWorkUnit.isPresent() && !maxWorkUnit.get().getPropAsBoolean(END_OF_DATASETS_KEY, false)) { previousDatasetUrnWatermark = maxWorkUnit.get().getProp(DATASET_URN); previousPartitionUrnWatermark = maxWorkUnit.get().getProp(PARTITION_URN); } + return new BasicWorkUnitStream.Builder(getWorkUnitIterator(datasetStream.iterator(), previousDatasetUrnWatermark, + previousPartitionUrnWatermark, maximumWorkUnits)).setFiniteStream(true).build(); - IterableDatasetFinder datasetsFinder = createDatasetsFinder(state); - - Stream<Dataset> datasetStream = - datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator); - datasetStream = sortStreamLexicographically(datasetStream); - - return new BasicWorkUnitStream.Builder( - new DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark, previousPartitionUrnWatermark, - maxWorkUnits, config)).setFiniteStream(true).build(); } catch (IOException ioe) { throw new RuntimeException(ioe); } } /** + * A factory to generate {@link WorkUnitStream} given a generic type datasetIterator. + * @throws IOException + */ + protected Iterator<WorkUnit> getWorkUnitIterator(Iterator<Dataset> datasetIterator, String previousDatasetUrnWatermark, + @Nullable String previousPartitionUrnWatermark, int maximumWorkUnits) throws IOException { + return new DeepIterator(datasetIterator, previousDatasetUrnWatermark, previousPartitionUrnWatermark, + maximumWorkUnits); + } + + /** * A deep iterator that advances input streams until the correct position, then possibly iterates over partitions * of {@link PartitionableDataset}s. */ - private class DeepIterator extends AbstractIterator<WorkUnit> { - private final Iterator<Dataset> baseIterator; - private final int maxWorkUnits; - + protected class DeepIterator extends AbstractIterator<WorkUnit> { + protected final Iterator<Dataset> baseIterator; + protected final int maxWorkUnits; + protected int generatedWorkUnits = 0; + protected Dataset previousDataset; private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator; - private int generatedWorkUnits = 0; - private Dataset previousDataset; private PartitionableDataset.DatasetPartition previousPartition; public DeepIterator(Iterator<Dataset> baseIterator, String previousDatasetUrnWatermark, - String previousPartitionUrnWatermark, int maxWorkUnits, Config config) + String previousPartitionUrnWatermark, int maxWorkUnits) throws IOException { this.maxWorkUnits = maxWorkUnits; this.baseIterator = baseIterator; @@ -201,6 +206,21 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour return endOfData(); } + WorkUnit resultWU = doComputeNext(); + if (resultWU == null) { + resultWU = generateNoopWorkUnit(); + this.generatedWorkUnits = Integer.MAX_VALUE; + resultWU.setProp(END_OF_DATASETS_KEY, true); + } + return resultWU; + } + + /** + * A extensible method that generate a workunit based on the Iterator generated from {@link #getWorkUnitIterator}. + * It interacts with {@link #baseIterator} and {@link #currentPartitionIterator} to know the very next + * dataset/partition to be converted into a workunit. + */ + protected WorkUnit doComputeNext() { while (this.baseIterator.hasNext() || this.currentPartitionIterator.hasNext()) { if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) { PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next(); @@ -215,7 +235,6 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour this.generatedWorkUnits++; return workUnit; } - Dataset dataset = this.baseIterator.next(); if (drilldownIntoPartitions && dataset instanceof PartitionableDataset) { this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset); @@ -230,13 +249,14 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour return workUnit; } } - WorkUnit workUnit = generateNoopWorkUnit(); - this.generatedWorkUnits = Integer.MAX_VALUE; - workUnit.setProp(END_OF_DATASETS_KEY, true); - return workUnit; + return null; } - private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) { + /** + * It is not necessary the case that each workunit is corresponding to a single {@link Dataset}, + * thus we make this method extensible. + */ + protected void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) { if (isDatasetStateStoreEnabled) { workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn()); }
