[BEAM-456] Add MongoDbIO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ae5cc73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ae5cc73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ae5cc73 Branch: refs/heads/gearpump-runner Commit: 9ae5cc7310137e928d353eba0104dfd7ae263a96 Parents: 3f48566 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Fri Jul 15 18:44:26 2016 +0200 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:12 2016 -0700 ---------------------------------------------------------------------- sdks/java/io/mongodb/pom.xml | 129 +++++ .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 553 +++++++++++++++++++ .../beam/sdk/io/mongodb/package-info.java | 22 + .../beam/sdk/io/mongodb/MongoDbIOTest.java | 209 +++++++ .../beam/sdk/io/mongodb/package-info.java | 22 + sdks/java/io/pom.xml | 1 + 6 files changed, 936 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ae5cc73/sdks/java/io/mongodb/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml new file mode 100644 index 0000000..60f1d1e --- /dev/null +++ b/sdks/java/io/mongodb/pom.xml @@ -0,0 +1,129 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.3.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-mongodb</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: MongoDB</name> + <description>IO to read and write on MongoDB.</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + </plugins> + </build> + + <properties> + <mongo-java-driver.version>3.2.2</mongo-java-driver.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>${mongo-java-driver.version}</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>de.flapdoodle.embed</groupId> + <artifactId>de.flapdoodle.embed.mongo</artifactId> + <version>1.50.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>de.flapdoodle.embed</groupId> + <artifactId>de.flapdoodle.embed.process</artifactId> + <version>1.50.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ae5cc73/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java new file mode 100644 index 0000000..7724614 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import com.mongodb.BasicDBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import org.bson.Document; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * IO to read and write data on MongoDB. + * <p> + * <h3>Reading from MongoDB</h3> + * <p> + * <p>MongoDbIO source returns a bounded collection of String as {@code PCollection<String>}. + * The String is the JSON form of the MongoDB Document.</p> + * <p> + * <p>To configure the MongoDB source, you have to provide the connection URI, the database name + * and the collection name. The following example illustrates various options for configuring the + * source:</p> + * <p> + * <pre>{@code + * + * pipeline.apply(MongoDbIO.read() + * .withUri("mongodb://localhost:27017") + * .withDatabase("my-database") + * .withCollection("my-collection")) + * // above three are required configuration, returns PCollection<String> + * + * // rest of the settings are optional + * + * }</pre> + * <p> + * <p>The source also accepts an optional configuration: {@code withFilter()} allows you to + * define a JSON filter to get subset of data.</p> + * <p> + * <h3>Writing to MongoDB</h3> + * <p> + * <p>MongoDB sink supports writing of Document (as JSON String) in a MongoDB.</p> + * <p> + * <p>To configure a MongoDB sink, you must specify a connection {@code URI}, a {@code Database} + * name, a {@code Collection} name. For instance:</p> + * <p> + * <pre>{@code + * + * pipeline + * .apply(...) + * .apply(MongoDbIO.write() + * .withUri("mongodb://localhost:27017") + * .withDatabase("my-database") + * .withCollection("my-collection") + * .withNumSplits(30)) + * + * }</pre> + */ +// TODO instead of JSON String, does it make sense to populate the PCollection with BSON Document or +// DBObject ?? +public class MongoDbIO { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIO.class); + + /** Read data from MongoDB. */ + public static Read read() { + return new Read(new BoundedMongoDbSource(null, null, null, null, 0)); + } + + /** Write data to MongoDB. */ + public static Write write() { + return new Write(new Write.MongoDbWriter(null, null, null, 1024L)); + } + + private MongoDbIO() { + } + + /** + * A {@link PTransform} to read data from MongoDB. + */ + public static class Read extends PTransform<PBegin, PCollection<String>> { + + public Read withUri(String uri) { + return new Read(source.withUri(uri)); + } + + public Read withDatabase(String database) { + return new Read(source.withDatabase(database)); + } + + public Read withCollection(String collection) { + return new Read(source.withCollection(collection)); + } + + public Read withFilter(String filter) { + return new Read(source.withFilter(filter)); + } + + public Read withNumSplits(int numSplits) { + return new Read(source.withNumSplits(numSplits)); + } + + private final BoundedMongoDbSource source; + + private Read(BoundedMongoDbSource source) { + this.source = source; + } + + @Override + public PCollection<String> apply(PBegin input) { + return input.apply(org.apache.beam.sdk.io.Read.from(getSource())); + } + + /** + * Creates a {@link BoundedSource} with the configuration in {@link Read}. + */ + @VisibleForTesting + BoundedSource<String> getSource() { + return source; + } + + @Override + public void validate(PBegin input) { + source.validate(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + source.populateDisplayData(builder); + } + + } + + private static class BoundedMongoDbSource extends BoundedSource<String> { + + public BoundedMongoDbSource withUri(String uri) { + return new BoundedMongoDbSource(uri, database, collection, filter, numSplits); + } + + public BoundedMongoDbSource withDatabase(String database) { + return new BoundedMongoDbSource(uri, database, collection, filter, numSplits); + } + + public BoundedMongoDbSource withCollection(String collection) { + return new BoundedMongoDbSource(uri, database, collection, filter, numSplits); + } + + public BoundedMongoDbSource withFilter(String filter) { + return new BoundedMongoDbSource(uri, database, collection, filter, numSplits); + } + + public BoundedMongoDbSource withNumSplits(int numSplits) { + return new BoundedMongoDbSource(uri, database, collection, filter, numSplits); + } + + private final String uri; + private final String database; + private final String collection; + @Nullable + private final String filter; + private final int numSplits; + + public BoundedMongoDbSource(String uri, String database, String collection, String filter, + int numSplits) { + this.uri = uri; + this.database = database; + this.collection = collection; + this.filter = filter; + this.numSplits = numSplits; + } + + @Override + public Coder getDefaultOutputCoder() { + return SerializableCoder.of(String.class); + } + + @Override + public void validate() { + Preconditions.checkNotNull(uri, "uri"); + Preconditions.checkNotNull(database, "database"); + Preconditions.checkNotNull(collection, "collection"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("uri", uri)); + builder.add(DisplayData.item("database", database)); + builder.add(DisplayData.item("collection", collection)); + builder.addIfNotNull(DisplayData.item("filter", filter)); + builder.add(DisplayData.item("numSplit", numSplits)); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) { + return false; + } + + @Override + public BoundedReader createReader(PipelineOptions options) { + return new BoundedMongoDbReader(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { + long estimatedSizeBytes = 0L; + + MongoClient mongoClient = new MongoClient(); + MongoDatabase mongoDatabase = mongoClient.getDatabase(database); + MongoCollection mongoCollection = mongoDatabase.getCollection(collection); + + // get the Mongo collStats object + // it gives the size for the entire collection + BasicDBObject stat = new BasicDBObject(); + stat.append("collStats", collection); + Document stats = mongoDatabase.runCommand(stat); + estimatedSizeBytes = Long.valueOf(stats.get("size").toString()); + return estimatedSizeBytes; + } + + @Override + public List<BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes, + PipelineOptions options) { + MongoClient mongoClient = new MongoClient(); + MongoDatabase mongoDatabase = mongoClient.getDatabase(database); + + List<Document> splitKeys = null; + if (numSplits > 0) { + // the user defines his desired number of splits + // calculate the batch size + long estimatedSizeBytes = getEstimatedSizeBytes(options); + desiredBundleSizeBytes = estimatedSizeBytes / numSplits; + } + + // the desired batch size is small, using default chunk size of 1MB + if (desiredBundleSizeBytes < 1024 * 1024) { + desiredBundleSizeBytes = 1 * 1024 * 1024; + } + + // now we have the batch size (provided by user or provided by the runner) + // we use Mongo splitVector command to get the split keys + BasicDBObject splitVectorCommand = new BasicDBObject(); + splitVectorCommand.append("splitVector", database + "." + collection); + splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1)); + splitVectorCommand.append("force", false); + // maxChunkSize is the Mongo partition size in MB + LOGGER.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024); + splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024); + Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand); + splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys"); + + List<BoundedSource<String>> sources = new ArrayList<>(); + if (splitKeys.size() < 1) { + LOGGER.debug("Split keys is low, using an unique source"); + sources.add(this); + return sources; + } + + LOGGER.debug("Number of splits is {}", splitKeys.size()); + for (String shardFilter : splitKeysToFilters(splitKeys, filter)) { + sources.add(this.withFilter(shardFilter)); + } + + return sources; + } + + /** + * Transform a list of split keys as a list of filters containing corresponding range. + * + * <p>The list of split keys contains BSon Document basically containing for example: + * <ul> + * <li>_id: 56</li> + * <li>_id: 109</li> + * <li>_id: 256</li> + * </ul> + * </p> + * + * This method will generate a list of range filters performing the following splits: + * <ul> + * <li>from the beginning of the collection up to _id 56, so basically data with + * _id lower than 56</li> + * <li>from _id 57 up to _id 109</li> + * <li>from _id 110 up to _id 256</li> + * <li>from _id 257 up to the end of the collection, so basically data with _id greater + * than 257</li> + * </ul> + * + * @param splitKeys The list of split keys. + * @param additionalFilter A custom (user) additional filter to append to the range filters. + * @return A list of filters containing the ranges. + */ + private static List<String> splitKeysToFilters(List<Document> splitKeys, String + additionalFilter) { + ArrayList<String> filters = new ArrayList<>(); + String lowestBound = null; // lower boundary (previous split in the iteration) + for (int i = 0; i < splitKeys.size(); i++) { + String splitKey = splitKeys.get(i).toString(); + String rangeFilter = null; + if (i == 0) { + // this is the first split in the list, the filter defines + // the range from the beginning up to this split + rangeFilter = String.format("{ $and: [ {\"_id\":{$lte:Objectd(\"%s\")}}", + splitKey); + } else if (i == splitKeys.size() - 1) { + // this is the last split in the list, the filter defines + // the range from the split up to the end + rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")}}", + splitKey); + } else { + // we are between two splits + rangeFilter = String.format("{ $and: [ {\"_id\":{$gt:ObjectId(\"%s\")," + + "$lte:ObjectId(\"%s\")}}", lowestBound, splitKey); + } + if (additionalFilter != null && !additionalFilter.isEmpty()) { + // user provided a filter, we append the user filter to the range filter + rangeFilter = String.format("%s,%s ]}", rangeFilter, additionalFilter); + } else { + // user didn't provide a filter, just cleany close the range filter + rangeFilter = String.format("%s ]}", rangeFilter); + } + + filters.add(rangeFilter); + + lowestBound = splitKey; + } + return filters; + } + } + + private static class BoundedMongoDbReader extends BoundedSource.BoundedReader<String> { + + private final BoundedMongoDbSource source; + + private MongoClient client; + private MongoCursor<Document> cursor; + private String current; + + public BoundedMongoDbReader(BoundedMongoDbSource source) { + this.source = source; + } + + @Override + public boolean start() { + client = new MongoClient(new MongoClientURI(source.uri)); + + MongoDatabase mongoDatabase = client.getDatabase(source.database); + + MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(source.collection); + + if (source.filter == null) { + cursor = mongoCollection.find().iterator(); + } else { + Document bson = Document.parse(source.filter); + cursor = mongoCollection.find(bson).iterator(); + } + + return advance(); + } + + @Override + public boolean advance() { + if (cursor.hasNext()) { + current = cursor.next().toJson(); + return true; + } else { + return false; + } + } + + @Override + public BoundedSource getCurrentSource() { + return source; + } + + @Override + public String getCurrent() { + return current; + } + + @Override + public void close() { + try { + if (cursor != null) { + cursor.close(); + } + } catch (Exception e) { + LOGGER.warn("Error closing MongoDB cursor", e); + } + try { + client.close(); + } catch (Exception e) { + LOGGER.warn("Error closing MongoDB client", e); + } + } + + } + + /** + * A {@link PTransform} to write to a MongoDB database. + */ + public static class Write extends PTransform<PCollection<String>, PDone> { + + public Write withUri(String uri) { + return new Write(writer.withUri(uri)); + } + + public Write withDatabase(String database) { + return new Write(writer.withDatabase(database)); + } + + public Write withCollection(String collection) { + return new Write(writer.withCollection(collection)); + } + + public Write withBatchSize(long batchSize) { + return new Write(writer.withBatchSize(batchSize)); + } + + private final MongoDbWriter writer; + + private Write(MongoDbWriter writer) { + this.writer = writer; + } + + @Override + public PDone apply(PCollection<String> input) { + input.apply(ParDo.of(writer)); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection<String> input) { + writer.validate(); + } + + private static class MongoDbWriter extends DoFn<String, Void> { + + private final String uri; + private final String database; + private final String collection; + private final long batchSize; + + private MongoClient client; + private List<Document> batch; + + public MongoDbWriter(String uri, String database, String collection, long batchSize) { + this.uri = uri; + this.database = database; + this.collection = collection; + this.batchSize = batchSize; + } + + public MongoDbWriter withUri(String uri) { + return new MongoDbWriter(uri, database, collection, batchSize); + } + + public MongoDbWriter withDatabase(String database) { + return new MongoDbWriter(uri, database, collection, batchSize); + } + + public MongoDbWriter withCollection(String collection) { + return new MongoDbWriter(uri, database, collection, batchSize); + } + + public MongoDbWriter withBatchSize(long batchSize) { + return new MongoDbWriter(uri, database, collection, batchSize); + } + + public void validate() { + Preconditions.checkNotNull(uri, "uri"); + Preconditions.checkNotNull(database, "database"); + Preconditions.checkNotNull(collection, "collection"); + Preconditions.checkNotNull(batchSize, "batchSize"); + } + + @Setup + public void createMongoClient() throws Exception { + client = new MongoClient(new MongoClientURI(uri)); + } + + @StartBundle + public void startBundle(Context ctx) throws Exception { + batch = new ArrayList<>(); + } + + @ProcessElement + public void processElement(ProcessContext ctx) throws Exception { + String value = ctx.element(); + + batch.add(Document.parse(ctx.element())); + if (batch.size() >= batchSize) { + finishBundle(ctx); + } + } + + @FinishBundle + public void finishBundle(Context ctx) throws Exception { + MongoDatabase mongoDatabase = client.getDatabase(database); + MongoCollection mongoCollection = mongoDatabase.getCollection(collection); + + mongoCollection.insertMany(batch); + + batch.clear(); + } + + @Teardown + public void closeMongoClient() throws Exception { + client.close(); + client = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ae5cc73/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java new file mode 100644 index 0000000..fd08b58 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing from MongoDB. + */ +package org.apache.beam.sdk.io.mongodb; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ae5cc73/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java new file mode 100644 index 0000000..308e071 --- /dev/null +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.junit.Assert.assertEquals; + +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; + +import de.flapdoodle.embed.mongo.MongodExecutable; +import de.flapdoodle.embed.mongo.MongodStarter; +import de.flapdoodle.embed.mongo.config.IMongodConfig; +import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder; +import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; +import de.flapdoodle.embed.mongo.config.Net; +import de.flapdoodle.embed.mongo.config.Storage; +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.process.io.file.Files; +import de.flapdoodle.embed.process.runtime.Network; + +import java.io.File; +import java.io.Serializable; +import java.util.ArrayList; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test on the MongoDbIO. + */ +public class MongoDbIOTest implements Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbIOTest.class); + + private static final String MONGODB_LOCATION = "target/mongodb"; + private static final int PORT = 27017; + private static final String DATABASE = "beam"; + private static final String COLLECTION = "test"; + + private transient MongodExecutable mongodExecutable; + + @Before + public void setup() throws Exception { + LOGGER.info("Starting MongoDB embedded instance"); + try { + Files.forceDelete(new File(MONGODB_LOCATION)); + } catch (Exception e) { + + } + new File(MONGODB_LOCATION).mkdirs(); + IMongodConfig mongodConfig = new MongodConfigBuilder() + .version(Version.Main.PRODUCTION) + .configServer(false) + .replication(new Storage(MONGODB_LOCATION, null, 0)) + .net(new Net("localhost", PORT, Network.localhostIsIPv6())) + .cmdOptions(new MongoCmdOptionsBuilder() + .syncDelay(10) + .useNoPrealloc(true) + .useSmallFiles(true) + .useNoJournal(true) + .build()) + .build(); + mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig); + mongodExecutable.start(); + + LOGGER.info("Insert test data"); + + MongoClient client = new MongoClient("localhost", PORT); + MongoDatabase database = client.getDatabase(DATABASE); + + MongoCollection collection = database.getCollection(COLLECTION); + + String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", + "Newton", "Bohr", "Galilei", "Maxwell"}; + for (int i = 1; i <= 1000; i++) { + int index = i % scientists.length; + Document document = new Document(); + document.append("_id", i); + document.append("scientist", scientists[index]); + collection.insertOne(document); + } + + } + + @After + public void stop() throws Exception { + LOGGER.info("Stopping MongoDB instance"); + mongodExecutable.stop(); + } + + @Test + @Category(NeedsRunner.class) + public void testFullRead() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + PCollection<String> output = pipeline.apply( + MongoDbIO.read() + .withUri("mongodb://localhost:" + PORT) + .withDatabase(DATABASE) + .withCollection(COLLECTION)); + + PAssert.thatSingleton(output.apply("Count All", Count.<String>globally())) + .isEqualTo(1000L); + + PAssert.that(output + .apply("Map Scientist", MapElements.via(new SimpleFunction<String, KV<String, Void>>() { + public KV<String, Void> apply(String input) { + Document bson = Document.parse(input); + return KV.of(bson.getString("scientist"), null); + } + })) + .apply("Count Scientist", Count.<String, Void>perKey()) + ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() { + @Override + public Void apply(Iterable<KV<String, Long>> input) { + for (KV<String, Long> element : input) { + assertEquals(100L, element.getValue().longValue()); + } + return null; + } + }); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithFilter() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + PCollection<String> output = pipeline.apply( + MongoDbIO.read() + .withUri("mongodb://localhost:" + PORT) + .withDatabase(DATABASE) + .withCollection(COLLECTION) + .withFilter("{\"scientist\":\"Einstein\"}")); + + PAssert.thatSingleton(output.apply("Count", Count.<String>globally())) + .isEqualTo(100L); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testWrite() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + ArrayList<String> data = new ArrayList<>(); + for (int i = 0; i < 10000; i++) { + data.add(String.format("{\"scientist\":\"Test %s\"}", i)); + } + pipeline.apply(Create.of(data)) + .apply(MongoDbIO.write().withUri("mongodb://localhost:" + PORT).withDatabase("test") + .withCollection("test")); + + pipeline.run(); + + MongoClient client = new MongoClient("localhost", PORT); + MongoDatabase database = client.getDatabase("test"); + MongoCollection collection = database.getCollection("test"); + + MongoCursor cursor = collection.find().iterator(); + + int count = 0; + while (cursor.hasNext()) { + count = count + 1; + cursor.next(); + } + + Assert.assertEquals(10000, count); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ae5cc73/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java new file mode 100644 index 0000000..fd08b58 --- /dev/null +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing from MongoDB. + */ +package org.apache.beam.sdk.io.mongodb; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ae5cc73/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 6cbd615..c4c32ed 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -38,6 +38,7 @@ <module>jms</module> <module>kafka</module> <module>kinesis</module> + <module>mongodb</module> </modules> </project>