Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 249d5a19c -> 1c2d30b06


[GOBBLIN-492] Refactor LoopingDatasetFinderSource to make it extensible

Closes #2363 from
autumnust/refactorIteratorFactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1c2d30b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1c2d30b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1c2d30b0

Branch: refs/heads/master
Commit: 1c2d30b06752da7d6332631498feeeb75d424d80
Parents: 249d5a1
Author: Lei Sun <[email protected]>
Authored: Tue May 15 14:41:51 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue May 15 14:41:51 2018 -0700

----------------------------------------------------------------------
 .../source/LoopingDatasetFinderSource.java      | 74 +++++++++++++-------
 1 file changed, 47 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c2d30b0/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
index 50b24de..8e39bd2 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
@@ -30,7 +30,6 @@ import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.PeekingIterator;
-import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
@@ -44,7 +43,6 @@ import org.apache.gobblin.runtime.task.NoopTask;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.util.ConfigUtils;
 
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
@@ -93,9 +91,8 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
   public WorkUnitStream getWorkunitStream(SourceState state, boolean 
isDatasetStateStoreEnabled) {
     this.isDatasetStateStoreEnabled = isDatasetStateStoreEnabled;
     try {
-      int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, 
MAX_WORK_UNITS_PER_RUN);
-      Preconditions.checkArgument(maxWorkUnits > 0, "Max work units must be 
greater than 0!");
-      Config config = ConfigUtils.propertiesToConfig(state.getProperties());
+      int maximumWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, 
MAX_WORK_UNITS_PER_RUN);
+      Preconditions.checkArgument(maximumWorkUnits > 0, "Max work units must 
be greater than 0!");
 
       List<WorkUnitState> previousWorkUnitStates = 
(this.isDatasetStateStoreEnabled) ? state
           
.getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN)
@@ -109,42 +106,50 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
         }
       }
 
+      IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);
+
+      Stream<Dataset> datasetStream =
+          datasetsFinder.getDatasetsStream(Spliterator.SORTED, 
this.lexicographicalComparator);
+      datasetStream = sortStreamLexicographically(datasetStream);
+
       String previousDatasetUrnWatermark = null;
       String previousPartitionUrnWatermark = null;
       if (maxWorkUnit.isPresent() && 
!maxWorkUnit.get().getPropAsBoolean(END_OF_DATASETS_KEY, false)) {
         previousDatasetUrnWatermark = maxWorkUnit.get().getProp(DATASET_URN);
         previousPartitionUrnWatermark = 
maxWorkUnit.get().getProp(PARTITION_URN);
       }
+      return new 
BasicWorkUnitStream.Builder(getWorkUnitIterator(datasetStream.iterator(), 
previousDatasetUrnWatermark,
+          previousPartitionUrnWatermark, 
maximumWorkUnits)).setFiniteStream(true).build();
 
-      IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);
-
-      Stream<Dataset> datasetStream =
-          datasetsFinder.getDatasetsStream(Spliterator.SORTED, 
this.lexicographicalComparator);
-      datasetStream = sortStreamLexicographically(datasetStream);
-
-      return new BasicWorkUnitStream.Builder(
-          new DeepIterator(datasetStream.iterator(), 
previousDatasetUrnWatermark, previousPartitionUrnWatermark,
-              maxWorkUnits, config)).setFiniteStream(true).build();
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     }
   }
 
   /**
+   * A factory to generate {@link WorkUnitStream} given a generic type 
datasetIterator.
+   * @throws IOException
+   */
+  protected Iterator<WorkUnit> getWorkUnitIterator(Iterator<Dataset> 
datasetIterator, String previousDatasetUrnWatermark,
+      @Nullable String previousPartitionUrnWatermark, int maximumWorkUnits) 
throws IOException {
+    return new DeepIterator(datasetIterator, previousDatasetUrnWatermark, 
previousPartitionUrnWatermark,
+            maximumWorkUnits);
+  }
+
+  /**
    * A deep iterator that advances input streams until the correct position, 
then possibly iterates over partitions
    * of {@link PartitionableDataset}s.
    */
-  private class DeepIterator extends AbstractIterator<WorkUnit> {
-    private final Iterator<Dataset> baseIterator;
-    private final int maxWorkUnits;
-
+  protected class DeepIterator extends AbstractIterator<WorkUnit> {
+    protected final Iterator<Dataset> baseIterator;
+    protected final int maxWorkUnits;
+    protected int generatedWorkUnits = 0;
+    protected Dataset previousDataset;
     private Iterator<PartitionableDataset.DatasetPartition> 
currentPartitionIterator;
-    private int generatedWorkUnits = 0;
-    private Dataset previousDataset;
     private PartitionableDataset.DatasetPartition previousPartition;
 
     public DeepIterator(Iterator<Dataset> baseIterator, String 
previousDatasetUrnWatermark,
-        String previousPartitionUrnWatermark, int maxWorkUnits, Config config)
+        String previousPartitionUrnWatermark, int maxWorkUnits)
         throws IOException {
       this.maxWorkUnits = maxWorkUnits;
       this.baseIterator = baseIterator;
@@ -201,6 +206,21 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
         return endOfData();
       }
 
+      WorkUnit resultWU = doComputeNext();
+      if (resultWU == null) {
+        resultWU = generateNoopWorkUnit();
+        this.generatedWorkUnits = Integer.MAX_VALUE;
+        resultWU.setProp(END_OF_DATASETS_KEY, true);
+      }
+      return resultWU;
+    }
+
+    /**
+     * A extensible method that generate a workunit based on the Iterator 
generated from {@link #getWorkUnitIterator}.
+     * It interacts with {@link #baseIterator} and {@link 
#currentPartitionIterator} to know the very next
+     * dataset/partition to be converted into a workunit.
+     */
+    protected WorkUnit doComputeNext() {
       while (this.baseIterator.hasNext() || 
this.currentPartitionIterator.hasNext()) {
         if (this.currentPartitionIterator != null && 
this.currentPartitionIterator.hasNext()) {
           PartitionableDataset.DatasetPartition partition = 
this.currentPartitionIterator.next();
@@ -215,7 +235,6 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
           this.generatedWorkUnits++;
           return workUnit;
         }
-
         Dataset dataset = this.baseIterator.next();
         if (drilldownIntoPartitions && dataset instanceof 
PartitionableDataset) {
           this.currentPartitionIterator = 
getPartitionIterator((PartitionableDataset) dataset);
@@ -230,13 +249,14 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
           return workUnit;
         }
       }
-      WorkUnit workUnit = generateNoopWorkUnit();
-      this.generatedWorkUnits = Integer.MAX_VALUE;
-      workUnit.setProp(END_OF_DATASETS_KEY, true);
-      return workUnit;
+      return null;
     }
 
-    private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
+    /**
+     * It is not necessary the case that each workunit is corresponding to a 
single {@link Dataset},
+     * thus we make this method extensible.
+     */
+    protected void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset 
dataset) {
       if (isDatasetStateStoreEnabled) {
         workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
       }

Reply via email to