Repository: beam Updated Branches: refs/heads/release-2.0.0 74bcc0237 -> cf1ce7b24
[BEAM-2256] Add the last previous range filter Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0960c64 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0960c64 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0960c64 Branch: refs/heads/release-2.0.0 Commit: d0960c642d0bf0675f96e0495c7dbe303714c68d Parents: 74bcc02 Author: Jean-Baptiste Onofré <[email protected]> Authored: Thu May 11 22:09:50 2017 +0200 Committer: Luke Cwik <[email protected]> Committed: Thu May 11 15:16:14 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 47 ++++++++++++++------ .../beam/sdk/io/mongodb/MongoDbIOTest.java | 18 ++++++++ 2 files changed, 52 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d0960c64/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 7236a50..620df74 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; @@ -184,7 +185,11 @@ public class MongoDbIO { } } - private static class BoundedMongoDbSource extends BoundedSource<Document> { + /** + * A MongoDB {@link BoundedSource} reading {@link Document} from a given instance. + */ + @VisibleForTesting + static class BoundedMongoDbSource extends BoundedSource<Document> { private Read spec; private BoundedMongoDbSource(Read spec) { @@ -294,7 +299,8 @@ public class MongoDbIO { * @param additionalFilter A custom (user) additional filter to append to the range filters. * @return A list of filters containing the ranges. */ - private static List<String> splitKeysToFilters(List<Document> splitKeys, String + @VisibleForTesting + static List<String> splitKeysToFilters(List<Document> splitKeys, String additionalFilter) { ArrayList<String> filters = new ArrayList<>(); String lowestBound = null; // lower boundary (previous split in the iteration) @@ -306,30 +312,45 @@ public class MongoDbIO { // the range from the beginning up to this split rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:ObjectId(\"%s\")}}", splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); } else if (i == splitKeys.size() - 1) { - // this is the last split in the list, the filter defines - // the range from the split up to the end + // this is the last split in the list, the filters define + // the range from the previous split to the current split and also + // the current split to the end + rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")," + + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}", splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); } else { // we are between two splits rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")," + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey); + filters.add(formatFilter(rangeFilter, additionalFilter)); } - if (additionalFilter != null && !additionalFilter.isEmpty()) { - // user provided a filter, we append the user filter to the range filter - rangeFilter = String.format("%s,%s ]}", rangeFilter, additionalFilter); - } else { - // user didn't provide a filter, just cleany close the range filter - rangeFilter = String.format("%s ]}", rangeFilter); - } - - filters.add(rangeFilter); lowestBound = splitKey; } return filters; } + + /** + * Cleanly format range filter, optionally adding the users filter if specified. + * + * @param filter The range filter. + * @param additionalFilter The users filter. Null if unspecified. + * @return The cleanly formatted range filter. + */ + private static String formatFilter(String filter, @Nullable String additionalFilter) { + if (additionalFilter != null && !additionalFilter.isEmpty()) { + // user provided a filter, we append the user filter to the range filter + return String.format("%s,%s ]}", filter, additionalFilter); + } else { + // user didn't provide a filter, just cleanly close the range filter + return String.format("%s ]}", filter); + } + } } private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<Document> { http://git-wip-us.apache.org/repos/asf/beam/blob/d0960c64/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index 454c6ba..cd26b48 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -38,6 +38,8 @@ import java.io.File; import java.io.Serializable; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.List; + import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -139,6 +141,22 @@ public class MongoDbIOTest implements Serializable { } @Test + public void testSplitIntoFilters() throws Exception { + ArrayList<Document> documents = new ArrayList<>(); + documents.add(new Document("_id", 56)); + documents.add(new Document("_id", 109)); + documents.add(new Document("_id", 256)); + List<String> filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents, null); + assertEquals(4, filters.size()); + assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0)); + assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}", + filters.get(1)); + assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}", + filters.get(2)); + assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", filters.get(3)); + } + + @Test public void testFullRead() throws Exception { PCollection<Document> output = pipeline.apply(
