Repository: incubator-beam Updated Branches: refs/heads/master 3879db036 -> 307d592d2
[BEAM-674] Add GridFS support to MongoDbIO Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/68c8c787 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/68c8c787 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/68c8c787 Branch: refs/heads/master Commit: 68c8c7872720f4e8fbcd017032c0e90e395e905c Parents: 3879db0 Author: Daniel Kulp <[email protected]> Authored: Fri Sep 16 16:58:56 2016 -0400 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Wed Sep 28 17:18:46 2016 +0200 ---------------------------------------------------------------------- sdks/java/io/mongodb/pom.xml | 6 +- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 427 +++++++++++++++++++ .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 257 +++++++++++ 3 files changed, 689 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml index 60f1d1e..b7e36af 100644 --- a/sdks/java/io/mongodb/pom.xml +++ b/sdks/java/io/mongodb/pom.xml @@ -89,6 +89,10 @@ <artifactId>mongo-java-driver</artifactId> <version>${mongo-java-driver.version}</version> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> <!-- test dependencies --> <dependency> @@ -126,4 +130,4 @@ </dependency> </dependencies> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java new file mode 100644 index 0000000..337e5f5 --- /dev/null +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.mongodb.DB; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.Mongo; +import com.mongodb.MongoURI; +import com.mongodb.gridfs.GridFS; +import com.mongodb.gridfs.GridFSDBFile; +import com.mongodb.util.JSON; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.bson.types.ObjectId; +import org.joda.time.Instant; + + +/** + * IO to read and write data on MongoDB GridFS. + * <p> + * <h3>Reading from MongoDB via GridFS</h3> + * <p> + * <p>MongoDbGridFSIO source returns a bounded collection of String as {@code PCollection<String>}. + * <p> + * <p>To configure the MongoDB source, you have to provide the connection URI, the database name + * and the bucket name. The following example illustrates various options for configuring the + * source:</p> + * <p> + * <pre>{@code + * + * pipeline.apply(MongoDbGridFSIO.read() + * .withUri("mongodb://localhost:27017") + * .withDatabase("my-database") + * .withBucket("my-bucket")) + * + * }</pre> + * + * <p>The source also accepts an optional configuration: {@code withQueryFilter()} allows you to + * define a JSON filter to get subset of files in the database.</p> + * + * <p>There is also an optional {@code ParseCallback} that can be specified that can be used to + * parse the InputStream into objects usable with Beam. By default, MongoDbGridFSIO will parse + * into Strings, splitting on line breaks and using the uploadDate of the file as the timestamp. + */ +public class MongoDbGridFSIO { + + /** + * Function for parsing the GridFSDBFile into objects for the PCollection. + * @param <T> + */ + public interface ParseCallback<T> extends Serializable { + /** + * Each value parsed from the file should be output as an + * Iterable of Line<T>. If timestamp is omitted, it will + * use the uploadDate of the GridFSDBFile. + */ + public static class Line<T> { + final Instant timestamp; + final T value; + + public Line(T value, Instant timestamp) { + this.value = value; + this.timestamp = timestamp; + } + public Line(T value) { + this.value = value; + this.timestamp = null; + } + }; + public Iterator<Line<T>> parse(GridFSDBFile input) throws IOException; + } + + /** + * Default implementation for parsing the InputStream to collection of + * strings splitting on the cr/lf. + */ + private static class StringsParseCallback implements ParseCallback<String> { + static final StringsParseCallback INSTANCE = new StringsParseCallback(); + + @Override + public Iterator<Line<String>> parse(final GridFSDBFile input) throws IOException { + final BufferedReader reader = + new BufferedReader(new InputStreamReader(input.getInputStream())); + return new Iterator<Line<String>>() { + String val = reader.readLine(); + @Override + public boolean hasNext() { + return val != null; + } + + @Override + public Line<String> next() { + Line<String> l = new Line<String>(val); + try { + val = reader.readLine(); + } catch (IOException e) { + val = null; + } + return l; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + }; + } + } + + /** Read data from GridFS. */ + public static Read<String> read() { + return new Read<String>(new Read.BoundedGridFSSource<String>(null, null, null, null, + StringsParseCallback.INSTANCE, StringUtf8Coder.of())); + } + + static class Read<T> extends PTransform<PBegin, PCollection<T>> { + public Read<T> withUri(String uri) { + return new Read<T>(new BoundedGridFSSource<T>(uri, options.database, + options.bucket, options.filterJson, + options.parser, options.coder)); + } + + public Read<T> withDatabase(String database) { + return new Read<T>(new BoundedGridFSSource<T>(options.uri, database, + options.bucket, options.filterJson, + options.parser, options.coder)); + } + + public Read<T> withBucket(String bucket) { + return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database, bucket, + options.filterJson, options.parser, options.coder)); + } + + public <X> Read<X> withParsingFn(ParseCallback<X> f) { + return new Read<X>(new BoundedGridFSSource<X>(options.uri, options.database, + options.bucket, options.filterJson, f, null)); + } + + public Read<T> withCoder(Coder<T> coder) { + return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database, + options.bucket, options.filterJson, options.parser, coder)); + } + + public Read<T> withQueryFilter(String filterJson) { + return new Read<T>(new BoundedGridFSSource<T>(options.uri, options.database, + options.bucket, filterJson, options.parser, options.coder)); + } + + private final BoundedGridFSSource<T> options; + + Read(BoundedGridFSSource<T> options) { + this.options = options; + } + + @Override + public PCollection<T> apply(PBegin input) { + org.apache.beam.sdk.io.Read.Bounded<T> unbounded = + org.apache.beam.sdk.io.Read.from(options); + PCollection<T> output = input.getPipeline().apply(unbounded); + if (options.coder != null) { + output.setCoder(options.coder); + } + return output; + } + + static class BoundedGridFSSource<T> extends BoundedSource<T> { + @Nullable + private final String uri; + @Nullable + private final String database; + @Nullable + private final String bucket; + @Nullable + private final String filterJson; + @Nullable + private final ParseCallback<T> parser; + @Nullable + private final Coder<T> coder; + @Nullable + private List<ObjectId> objectIds; + private transient Mongo mongo; + private transient GridFS gridfs; + + BoundedGridFSSource(String uri, String database, String bucket, String filterJson, + ParseCallback<T> parser, Coder<T> coder) { + this.uri = uri; + this.database = database; + this.bucket = bucket; + this.parser = parser; + this.coder = coder; + this.filterJson = filterJson; + } + BoundedGridFSSource(String uri, String database, String bucket, String filterJson, + ParseCallback<T> parser, Coder<T> coder, List<ObjectId> objectIds) { + this.uri = uri; + this.database = database; + this.bucket = bucket; + this.parser = parser; + this.coder = coder; + this.objectIds = objectIds; + this.filterJson = filterJson; + } + private synchronized void setupGridFS() { + if (gridfs == null) { + mongo = uri == null ? new Mongo() : new Mongo(new MongoURI(uri)); + DB db = database == null ? mongo.getDB("gridfs") : mongo.getDB(database); + gridfs = bucket == null ? new GridFS(db) : new GridFS(db, bucket); + } + } + private synchronized void closeGridFS() { + if (gridfs != null) { + gridfs = null; + mongo.close(); + mongo = null; + } + } + + @Override + public List<? extends BoundedSource<T>> splitIntoBundles(long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + try { + setupGridFS(); + DBCursor cursor; + if (filterJson != null) { + DBObject query = (DBObject) JSON.parse(filterJson); + cursor = gridfs.getFileList(query).sort(null); + } else { + cursor = gridfs.getFileList().sort(null); + } + long size = 0; + List<BoundedGridFSSource<T>> list = new LinkedList<>(); + List<ObjectId> objects = new LinkedList<>(); + while (cursor.hasNext()) { + GridFSDBFile file = (GridFSDBFile) cursor.next(); + long len = file.getLength(); + if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) { + list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson, + parser, coder, objects)); + size = 0; + objects = new LinkedList<>(); + } + objects.add((ObjectId) file.getId()); + size += len; + } + if (!objects.isEmpty() || list.isEmpty()) { + list.add(new BoundedGridFSSource<T>(uri, database, bucket, filterJson, + parser, coder, objects)); + } + return list; + } finally { + closeGridFS(); + } + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + try { + setupGridFS(); + DBCursor cursor; + if (filterJson != null) { + DBObject query = (DBObject) JSON.parse(filterJson); + cursor = gridfs.getFileList(query).sort(null); + } else { + cursor = gridfs.getFileList().sort(null); + } + long size = 0; + while (cursor.hasNext()) { + GridFSDBFile file = (GridFSDBFile) cursor.next(); + size += file.getLength(); + } + return size; + } finally { + closeGridFS(); + } + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + @Override + public org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> createReader( + PipelineOptions options) throws IOException { + return new GridFSReader(this); + } + + @Override + public void validate() { + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotNull(DisplayData.item("uri", uri)); + builder.addIfNotNull(DisplayData.item("database", database)); + builder.addIfNotNull(DisplayData.item("bucket", bucket)); + builder.addIfNotNull(DisplayData.item("filterJson", filterJson)); + } + + @SuppressWarnings("unchecked") + @Override + public Coder<T> getDefaultOutputCoder() { + if (coder != null) { + return coder; + } + return (Coder<T>) SerializableCoder.of(Serializable.class); + } + + class GridFSReader extends org.apache.beam.sdk.io.BoundedSource.BoundedReader<T> { + final BoundedGridFSSource<T> source; + + Instant timestamp = Instant.now(); + Iterator<ParseCallback.Line<T>> currentIterator; + ParseCallback.Line<T> currentLine; + + GridFSReader(BoundedGridFSSource<T> source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + setupGridFS(); + if (objectIds == null) { + objectIds = new LinkedList<>(); + DBCursor cursor = gridfs.getFileList().sort(null); + while (cursor.hasNext()) { + DBObject ob = cursor.next(); + objectIds.add((ObjectId) ob.get("_id")); + } + } + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (currentIterator != null && !currentIterator.hasNext()) { + objectIds.remove(0); + currentIterator = null; + } + if (currentIterator == null) { + if (objectIds.isEmpty()) { + return false; + } + ObjectId oid = objectIds.get(0); + GridFSDBFile file = gridfs.find(oid); + if (file == null) { + return false; + } + timestamp = new Instant(file.getUploadDate().getTime()); + currentIterator = parser.parse(file); + } + + if (currentIterator.hasNext()) { + currentLine = currentIterator.next(); + return true; + } + return false; + } + + @Override + public BoundedSource<T> getCurrentSource() { + return source; + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (currentLine != null) { + return currentLine.value; + } + throw new NoSuchElementException(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (currentLine != null) { + if (currentLine.timestamp != null) { + return currentLine.timestamp; + } + return timestamp; + } + throw new NoSuchElementException(); + } + + @Override + public void close() throws IOException { + closeGridFS(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/68c8c787/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java new file mode 100644 index 0000000..f8e5f77 --- /dev/null +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.junit.Assert.assertEquals; + +import com.mongodb.DB; +import com.mongodb.Mongo; +import com.mongodb.gridfs.GridFS; +import com.mongodb.gridfs.GridFSDBFile; +import com.mongodb.gridfs.GridFSInputFile; + +import de.flapdoodle.embed.mongo.MongodExecutable; +import de.flapdoodle.embed.mongo.MongodStarter; +import de.flapdoodle.embed.mongo.config.IMongodConfig; +import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder; +import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; +import de.flapdoodle.embed.mongo.config.Net; +import de.flapdoodle.embed.mongo.config.Storage; +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.process.io.file.Files; +import de.flapdoodle.embed.process.runtime.Network; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; +import java.util.Scanner; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO.ParseCallback; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test on the MongoDbGridFSIO. + */ +public class MongoDBGridFSIOTest implements Serializable { + private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBGridFSIOTest.class); + + private static final String MONGODB_LOCATION = "target/mongodb"; + private static final int PORT = 27017; + private static final String DATABASE = "gridfs"; + + private transient MongodExecutable mongodExecutable; + + @Before + public void setup() throws Exception { + LOGGER.info("Starting MongoDB embedded instance"); + try { + Files.forceDelete(new File(MONGODB_LOCATION)); + } catch (Exception e) { + + } + new File(MONGODB_LOCATION).mkdirs(); + IMongodConfig mongodConfig = new MongodConfigBuilder() + .version(Version.Main.PRODUCTION) + .configServer(false) + .replication(new Storage(MONGODB_LOCATION, null, 0)) + .net(new Net("localhost", PORT, Network.localhostIsIPv6())) + .cmdOptions(new MongoCmdOptionsBuilder() + .syncDelay(10) + .useNoPrealloc(true) + .useSmallFiles(true) + .useNoJournal(true) + .build()) + .build(); + mongodExecutable = MongodStarter.getDefaultInstance().prepare(mongodConfig); + mongodExecutable.start(); + + LOGGER.info("Insert test data"); + + Mongo client = new Mongo("localhost", PORT); + DB database = client.getDB(DATABASE); + GridFS gridfs = new GridFS(database); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (int x = 0; x < 100; x++) { + out.write(("Einstein\nDarwin\nCopernicus\nPasteur\n" + + "Curie\nFaraday\nNewton\nBohr\nGalilei\nMaxwell\n").getBytes()); + } + for (int x = 0; x < 5; x++) { + gridfs.createFile(new ByteArrayInputStream(out.toByteArray()), "file" + x).save(); + } + + gridfs = new GridFS(database, "mapBucket"); + long now = System.currentTimeMillis(); + Random random = new Random(); + String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", + "Newton", "Bohr", "Galilei", "Maxwell"}; + for (int x = 0; x < 10; x++) { + GridFSInputFile file = gridfs.createFile("file_" + x); + OutputStream outf = file.getOutputStream(); + OutputStreamWriter writer = new OutputStreamWriter(outf); + for (int y = 0; y < 5000; y++) { + long time = now - random.nextInt(3600000); + String name = scientists[y % scientists.length]; + writer.write(Long.toString(time) + "\t"); + writer.write(name + "\t"); + writer.write(Integer.toString(random.nextInt(100))); + writer.write("\n"); + } + for (int y = 0; y < scientists.length; y++) { + String name = scientists[y % scientists.length]; + writer.write(Long.toString(now) + "\t"); + writer.write(name + "\t"); + writer.write("101"); + writer.write("\n"); + } + writer.flush(); + writer.close(); + } + } + + @After + public void stop() throws Exception { + LOGGER.info("Stopping MongoDB instance"); + mongodExecutable.stop(); + } + + @Test + @Category(NeedsRunner.class) + public void testFullRead() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + PCollection<String> output = pipeline.apply( + MongoDbGridFSIO.read() + .withUri("mongodb://localhost:" + PORT) + .withDatabase(DATABASE)); + + PAssert.thatSingleton( + output.apply("Count All", Count.<String>globally())) + .isEqualTo(5000L); + + PAssert.that( + output.apply("Count PerElement", Count.<String>perElement())) + .satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() { + @Override + public Void apply(Iterable<KV<String, Long>> input) { + for (KV<String, Long> element : input) { + assertEquals(500L, element.getValue().longValue()); + } + return null; + } + }); + + pipeline.run(); + } + + + @Test + @Category(NeedsRunner.class) + public void testReadWithParser() throws Exception { + TestPipeline pipeline = TestPipeline.create(); + + PCollection<KV<String, Integer>> output = pipeline.apply( + MongoDbGridFSIO.read() + .withUri("mongodb://localhost:" + PORT) + .withDatabase(DATABASE) + .withBucket("mapBucket") + .withParsingFn(new ParseCallback<KV<String, Integer>>() { + @Override + public Iterator<MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>>> parse( + GridFSDBFile input) throws IOException { + final BufferedReader reader = + new BufferedReader(new InputStreamReader(input.getInputStream())); + return new Iterator<Line<KV<String, Integer>>>() { + String line = reader.readLine(); + @Override + public boolean hasNext() { + return line != null; + } + @Override + public MongoDbGridFSIO.ParseCallback.Line<KV<String, Integer>> next() { + try (Scanner scanner = new Scanner(line.trim())) { + scanner.useDelimiter("\\t"); + long timestamp = scanner.nextLong(); + String name = scanner.next(); + int score = scanner.nextInt(); + + try { + line = reader.readLine(); + } catch (IOException e) { + line = null; + } + if (line == null) { + try { + reader.close(); + } catch (IOException e) { + //ignore + } + } + return new Line<>(KV.of(name, score), new Instant(timestamp)); + } + } + @Override + public void remove() { + } + }; + } + })).setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); + + PAssert.thatSingleton(output.apply("Count All", Count.<KV<String, Integer>>globally())) + .isEqualTo(50100L); + + PAssert.that(output.apply("Max PerElement", Max.<String>integersPerKey())) + .satisfies(new SerializableFunction<Iterable<KV<String, Integer>>, Void>() { + @Override + public Void apply(Iterable<KV<String, Integer>> input) { + for (KV<String, Integer> element : input) { + assertEquals(101, element.getValue().longValue()); + } + return null; + } + }); + pipeline.run(); + } + +}
