Repository: incubator-gobblin Updated Branches: refs/heads/master b39bf8cab -> d6f0112a9
[GOBBLIN-471] Skip nulls work units in DatasetFinderSource and LoopingDatasetFinderSource. Closes #2344 from ibuenros/datasetfindersource- skipnull Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d6f0112a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d6f0112a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d6f0112a Branch: refs/heads/master Commit: d6f0112a99dd9fb6e973f1999dd196c39a9d3ee4 Parents: b39bf8c Author: ibuenros <[email protected]> Authored: Mon Apr 23 16:17:15 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Apr 23 16:17:15 2018 -0700 ---------------------------------------------------------------------- .../gobblin/data/management/source/DatasetFinderSource.java | 5 +++-- .../data/management/source/LoopingDatasetFinderSource.java | 6 ++++++ 2 files changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6f0112a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java index 38fc7e2..77f1d15 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java @@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.source; import java.io.IOException; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -110,9 +111,9 @@ public abstract class DatasetFinderSource<S, D> implements WorkUnitStreamSource< } else { return Stream.of(new DatasetWrapper(dataset)); } - }).map(this::workUnitForPartitionInternal); + }).map(this::workUnitForPartitionInternal).filter(Objects::nonNull); } else { - return datasetStream.map(this::workUnitForDataset); + return datasetStream.map(this::workUnitForDataset).filter(Objects::nonNull); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6f0112a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java index 355f463..50b24de 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java @@ -205,6 +205,9 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) { PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next(); WorkUnit workUnit = workUnitForDatasetPartition(partition); + if (workUnit == null) { + continue; + } addDatasetInfoToWorkUnit(workUnit, partition.getDataset()); addPartitionInfoToWorkUnit(workUnit, partition); this.previousDataset = partition.getDataset(); @@ -218,6 +221,9 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset); } else { WorkUnit workUnit = workUnitForDataset(dataset); + if (workUnit == null) { + continue; + } addDatasetInfoToWorkUnit(workUnit, dataset); this.previousDataset = dataset; this.generatedWorkUnits++;
