Repository: beam Updated Branches: refs/heads/master 80c86f81b -> 27a596131
[BEAM-2787] Fix MongoDbIO exception when trying to write empty bundle Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fd274bac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fd274bac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fd274bac Branch: refs/heads/master Commit: fd274bac2386d4af79224c60ee214f3615dd7434 Parents: 80c86f8 Author: Ismaël MejÃa <[email protected]> Authored: Fri Sep 8 11:59:54 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Fri Sep 8 12:47:25 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java | 7 ++++--- .../org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java | 11 +++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fd274bac/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 087123a..d29f0ae 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -646,7 +646,7 @@ public class MongoDbIO { builder.add(DisplayData.item("batchSize", batchSize())); } - private static class WriteFn extends DoFn<Document, Void> { + static class WriteFn extends DoFn<Document, Void> { private final Write spec; private transient MongoClient client; private List<Document> batch; @@ -684,11 +684,12 @@ public class MongoDbIO { } private void flush() { + if (batch.isEmpty()) { + return; + } MongoDatabase mongoDatabase = client.getDatabase(spec.database()); MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection()); - mongoCollection.insertMany(batch); - batch.clear(); } http://git-wip-us.apache.org/repos/asf/beam/blob/fd274bac/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index 67dbca4..a3fe063 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -270,4 +271,14 @@ public class MongoDbIOTest implements Serializable { } + @Test + public void testWriteEmptyCollection() throws Exception { + MongoDbIO.Write write = + MongoDbIO.write() + .withUri("mongodb://localhost:" + port) + .withDatabase("test") + .withCollection("empty"); + DoFnTester<Document, Void> fnTester = DoFnTester.of(new MongoDbIO.Write.WriteFn(write)); + fnTester.processBundle(new ArrayList<Document>()); + } }
