Repository: storm Updated Branches: refs/heads/1.x-branch 53e1ab0c6 -> eeeb7b9c3
STORM-1483: add storm-mongodb connector Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/45fe4595 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/45fe4595 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/45fe4595 Branch: refs/heads/1.x-branch Commit: 45fe4595c64bb7772cafd3d4bd9923b2429e835b Parents: f0abfff Author: Xin Wang <[email protected]> Authored: Sun Jan 24 22:05:23 2016 +0800 Committer: vesense <[email protected]> Committed: Wed Mar 16 14:51:13 2016 +0800 ---------------------------------------------------------------------- external/storm-mongodb/README.md | 195 +++++++++++++++++++ external/storm-mongodb/pom.xml | 74 +++++++ .../storm/mongodb/bolt/AbstractMongoBolt.java | 56 ++++++ .../storm/mongodb/bolt/MongoInsertBolt.java | 62 ++++++ .../storm/mongodb/bolt/MongoUpdateBolt.java | 75 +++++++ .../storm/mongodb/common/MongoDBClient.java | 91 +++++++++ .../mongodb/common/QueryFilterCreator.java | 38 ++++ .../common/SimpleQueryFilterCreator.java | 39 ++++ .../mongodb/common/mapper/MongoMapper.java | 38 ++++ .../common/mapper/SimpleMongoMapper.java | 40 ++++ .../common/mapper/SimpleMongoUpdateMapper.java | 41 ++++ .../storm/mongodb/trident/state/MongoState.java | 97 +++++++++ .../trident/state/MongoStateFactory.java | 42 ++++ .../trident/state/MongoStateUpdater.java | 34 ++++ .../storm/mongodb/topology/InsertWordCount.java | 81 ++++++++ .../storm/mongodb/topology/UpdateWordCount.java | 91 +++++++++ .../storm/mongodb/topology/WordCounter.java | 67 +++++++ .../storm/mongodb/topology/WordSpout.java | 88 +++++++++ .../storm/mongodb/trident/WordCountTrident.java | 85 ++++++++ pom.xml | 1 + storm-dist/binary/src/main/assembly/binary.xml | 14 ++ 21 files changed, 1349 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/README.md ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md new file mode 100644 index 0000000..614b52f --- /dev/null +++ b/external/storm-mongodb/README.md @@ -0,0 +1,195 @@ +#Storm MongoDB + +Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This package includes the core bolts and trident states that allows a storm topology to either insert storm tuples in a database collection or to execute update queries against a database collection in a storm topology. + +## Insert into Database +The bolt and trident state included in this package for inserting data into a database collection. + +### MongoMapper +The main API for inserting data in a collection using MongoDB is the `org.apache.storm.mongodb.common.mapper.MongoMapper` interface: + +```java +public interface MongoMapper extends Serializable { + Document toDocument(ITuple tuple); +} +``` + +### SimpleMongoMapper +`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoMapper` that can map Storm tuple to a Database document. `SimpleMongoMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to. + +```java +public class SimpleMongoMapper implements MongoMapper { + private String[] fields; + + @Override + public Document toDocument(ITuple tuple) { + Document document = new Document(); + for(String field : fields){ + document.append(field, tuple.getValueByField(field)); + } + return document; + } + + public SimpleMongoMapper withFields(String... fields) { + this.fields = fields; + return this; + } +} +``` + +### MongoInsertBolt +To use the `MongoInsertBolt`, you construct an instance of it by specifying url, collectionName and a `MongoMapper` implementation that converts storm tuple to DB document. The following is the standard URI connection scheme: + `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]` + +More options information(eg: Write Concern Options) about Mongo URI, you can visit https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options + + ```java +String url = "mongodb://127.0.0.1:27017/test"; +String collectionName = "wordcount"; + +MongoMapper mapper = new SimpleMongoMapper() + .withFields("word", "count"); + +MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper); + ``` + +### MongoTridentState +We also support a trident persistent state that can be used with trident topologies. To create a Mongo persistent trident state you need to initialize it with the url, collectionName, the `MongoMapper` instance. See the example below: + + ```java + MongoMapper mapper = new SimpleMongoMapper() + .withFields("word", "count"); + + MongoState.Options options = new MongoState.Options() + .withUrl(url) + .withCollectionName(collectionName) + .withMapper(mapper); + + StateFactory factory = new MongoStateFactory(options); + + TridentTopology topology = new TridentTopology(); + Stream stream = topology.newStream("spout1", spout); + + stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields()); + ``` + **NOTE**: + >If there is no unique index provided, trident state inserts in the case of failures may result in duplicate documents. + +## Update from Database +The bolt included in this package for updating data from a database collection. + +### SimpleMongoUpdateMapper +`storm-mongodb` includes a general purpose `MongoMapper` implementation called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields with same name as the document field name in the database collection that you intend to write to. +`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a field in a document. More information about update operator, you can visit +https://docs.mongodb.org/manual/reference/operator/update/ + +```java +public class SimpleMongoUpdateMapper implements MongoMapper { + private String[] fields; + + @Override + public Document toDocument(ITuple tuple) { + Document document = new Document(); + for(String field : fields){ + document.append(field, tuple.getValueByField(field)); + } + return new Document("$set", document); + } + + public SimpleMongoUpdateMapper withFields(String... fields) { + this.fields = fields; + return this; + } +} +``` + + + +### QueryFilterCreator +The main API for creating a MongoDB query Filter is the `org.apache.storm.mongodb.common.QueryFilterCreator` interface: + + ```java +public interface QueryFilterCreator extends Serializable { + Bson createFilter(ITuple tuple); +} + ``` + +### SimpleQueryFilterCreator +`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by given Tuple. `QueryFilterCreator` uses `$eq` operator for matching values that are equal to a specified value. More information about query operator, you can visit +https://docs.mongodb.org/manual/reference/operator/query/ + + ```java +public class SimpleQueryFilterCreator implements QueryFilterCreator { + private String field; + + @Override + public Bson createFilter(ITuple tuple) { + return Filters.eq(field, tuple.getValueByField(field)); + } + + public SimpleQueryFilterCreator withField(String field) { + this.field = field; + return this; + } + +} + ``` + +### MongoUpdateBolt +To use the `MongoUpdateBolt`, you construct an instance of it by specifying Mongo url, collectionName, a `QueryFilterCreator` implementation and a `MongoMapper` implementation that converts storm tuple to DB document. + + ```java + MongoMapper mapper = new SimpleMongoUpdateMapper() + .withFields("word", "count"); + + QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator() + .withField("word"); + + MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); + + //if a new document should be inserted if there are no matches to the query filter + //updateBolt.withUpsert(true); + ``` + + Or use a anonymous inner class implementation for `QueryFilterCreator`: + + ```java + MongoMapper mapper = new SimpleMongoUpdateMapper() + .withFields("word", "count"); + + QueryFilterCreator updateQueryCreator = new QueryFilterCreator() { + @Override + public Bson createFilter(ITuple tuple) { + return Filters.gt("count", 3); + } + }; + + MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); + + //if a new document should be inserted if there are no matches to the query filter + //updateBolt.withUpsert(true); + ``` + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +## Committer Sponsors + + * Sriharsha Chintalapani ([[email protected]](mailto:[email protected])) + http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml new file mode 100644 index 0000000..7653ac8 --- /dev/null +++ b/external/storm-mongodb/pom.xml @@ -0,0 +1,74 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>storm-mongodb</artifactId> + + <developers> + <developer> + <id>vesense</id> + <name>Xin Wang</name> + <email>[email protected]</email> + </developer> + </developers> + + <properties> + <mongodb.version>3.2.0</mongodb.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>${mongodb.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <!--test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java new file mode 100644 index 0000000..f730ec7 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.bolt; + +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseRichBolt; + +public abstract class AbstractMongoBolt extends BaseRichBolt { + + private String url; + private String collectionName; + + protected OutputCollector collector; + protected MongoDBClient mongoClient; + + public AbstractMongoBolt(String url, String collectionName) { + Validate.notEmpty(url, "url can not be blank or null"); + Validate.notEmpty(collectionName, "collectionName can not be blank or null"); + + this.url = url; + this.collectionName = collectionName; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + this.mongoClient = new MongoDBClient(url, collectionName); + } + + @Override + public void cleanup() { + this.mongoClient.close(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java new file mode 100644 index 0000000..26cd150 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.bolt; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.bson.Document; + +/** + * Basic bolt for writing to MongoDB. + * + * Note: Each MongoInsertBolt defined in a topology is tied to a specific collection. + * + */ +public class MongoInsertBolt extends AbstractMongoBolt { + + private MongoMapper mapper; + + public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) { + super(url, collectionName); + + Validate.notNull(mapper, "MongoMapper can not be null"); + + this.mapper = mapper; + } + + @Override + public void execute(Tuple tuple) { + try{ + //get document + Document doc = mapper.toDocument(tuple); + mongoClient.insert(doc); + this.collector.ack(tuple); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(tuple); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java new file mode 100644 index 0000000..1994993 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.bolt; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.QueryFilterCreator; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.bson.Document; +import org.bson.conversions.Bson; + +/** + * Basic bolt for updating from MongoDB. + * + * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection. + * + */ +public class MongoUpdateBolt extends AbstractMongoBolt { + + private QueryFilterCreator queryCreator; + private MongoMapper mapper; + + private boolean upsert; //The default is false. + + public MongoUpdateBolt(String url, String collectionName, QueryFilterCreator queryCreator, MongoMapper mapper) { + super(url, collectionName); + + Validate.notNull(queryCreator, "QueryFilterCreator can not be null"); + Validate.notNull(mapper, "MongoMapper can not be null"); + + this.queryCreator = queryCreator; + this.mapper = mapper; + } + + @Override + public void execute(Tuple tuple) { + try{ + //get document + Document doc = mapper.toDocument(tuple); + //get query filter + Bson filter = queryCreator.createFilter(tuple); + mongoClient.update(filter, doc, upsert); + this.collector.ack(tuple); + } catch (Exception e) { + this.collector.reportError(e); + this.collector.fail(tuple); + } + } + + public void withUpsert(boolean upsert) { + this.upsert = upsert; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java new file mode 100644 index 0000000..be2e376 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common; + +import java.util.List; + +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.UpdateOptions; + +public class MongoDBClient { + + private MongoClient client; + private MongoCollection<Document> collection; + + public MongoDBClient(String url, String collectionName) { + //Creates a MongoURI from the given string. + MongoClientURI uri = new MongoClientURI(url); + //Creates a MongoClient described by a URI. + this.client = new MongoClient(uri); + //Gets a Database. + MongoDatabase db = client.getDatabase(uri.getDatabase()); + //Gets a collection. + this.collection = db.getCollection(collectionName); + } + + /** + * Inserts the provided document. + * + * @param document + */ + public void insert(Document document) { + collection.insertOne(document); + } + + /** + * Inserts one or more documents. + * This method is equivalent to a call to the bulkWrite method. + * The documents will be inserted in the order provided, + * stopping on the first failed insertion. + * + * @param documents + */ + public void insert(List<Document> documents) { + collection.insertMany(documents); + } + + /** + * Update all documents in the collection according to the specified query filter. + * When upsert set to true, the new document will be inserted if there are no matches to the query filter. + * + * @param filter + * @param update + * @param upsert + */ + public void update(Bson filter, Bson update, boolean upsert) { + UpdateOptions options = new UpdateOptions(); + if(upsert) { + options.upsert(true); + } + collection.updateMany(filter, update, options); + } + + /** + * Closes all resources associated with this instance. + */ + public void close(){ + client.close(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java new file mode 100644 index 0000000..d95f717 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common; + +import java.io.Serializable; + +import org.apache.storm.tuple.ITuple; +import org.bson.conversions.Bson; + +/** + * Create a MongoDB query Filter by given Tuple. + */ +public interface QueryFilterCreator extends Serializable { + + /** + * Create a query Filter by given Tuple + * + * @param tuple + * @return query Filter + */ + Bson createFilter(ITuple tuple); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java new file mode 100644 index 0000000..8b4f1c3 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common; + +import org.apache.storm.tuple.ITuple; +import org.bson.conversions.Bson; + +import com.mongodb.client.model.Filters; + +public class SimpleQueryFilterCreator implements QueryFilterCreator { + + private String field; + + @Override + public Bson createFilter(ITuple tuple) { + return Filters.eq(field, tuple.getValueByField(field)); + } + + public SimpleQueryFilterCreator withField(String field) { + this.field = field; + return this; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java new file mode 100644 index 0000000..7bcd499 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common.mapper; + +import java.io.Serializable; + +import org.apache.storm.tuple.ITuple; +import org.bson.Document; + +/** + * Given a Tuple, converts it to an MongoDB document. + */ +public interface MongoMapper extends Serializable { + + /** + * Converts a Tuple to a Document + * + * @param tuple the incoming tuple + * @return the MongoDB document + */ + Document toDocument(ITuple tuple); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java new file mode 100644 index 0000000..4440962 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common.mapper; + +import org.apache.storm.tuple.ITuple; +import org.bson.Document; + +public class SimpleMongoMapper implements MongoMapper { + + private String[] fields; + + @Override + public Document toDocument(ITuple tuple) { + Document document = new Document(); + for(String field : fields){ + document.append(field, tuple.getValueByField(field)); + } + return document; + } + + public SimpleMongoMapper withFields(String... fields) { + this.fields = fields; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java new file mode 100644 index 0000000..f07d4dc --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common.mapper; + +import org.apache.storm.tuple.ITuple; +import org.bson.Document; + +public class SimpleMongoUpdateMapper implements MongoMapper { + + private String[] fields; + + @Override + public Document toDocument(ITuple tuple) { + Document document = new Document(); + for(String field : fields){ + document.append(field, tuple.getValueByField(field)); + } + //$set operator: Sets the value of a field in a document. + return new Document("$set", document); + } + + public SimpleMongoUpdateMapper withFields(String... fields) { + this.fields = fields; + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java new file mode 100644 index 0000000..843fcee --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + + private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + + private Options options; + private MongoDBClient mongoClient; + private Map map; + + protected MongoState(Map map, Options options) { + this.options = options; + this.map = map; + } + + public static class Options implements Serializable { + private String url; + private String collectionName; + private MongoMapper mapper; + + public Options withUrl(String url) { + this.url = url; + return this; + } + + public Options withCollectionName(String collectionName) { + this.collectionName = collectionName; + return this; + } + + public Options withMapper(MongoMapper mapper) { + this.mapper = mapper; + return this; + } + } + + protected void prepare() { + Validate.notEmpty(options.url, "url can not be blank or null"); + Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); + Validate.notNull(options.mapper, "MongoMapper can not be null"); + + this.mongoClient = new MongoDBClient(options.url, options.collectionName); + } + + @Override + public void beginCommit(Long txid) { + LOG.debug("beginCommit is noop."); + } + + @Override + public void commit(Long txid) { + LOG.debug("commit is noop."); + } + + public void updateState(List<TridentTuple> tuples, TridentCollector collector) { + List<Document> documents = Lists.newArrayList(); + for (TridentTuple tuple : tuples) { + Document document = options.mapper.toDocument(tuple); + documents.add(document); + } + this.mongoClient.insert(documents); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java new file mode 100644 index 0000000..d6cd3a5 --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.util.Map; + +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; + +public class MongoStateFactory implements StateFactory { + + private MongoState.Options options; + + public MongoStateFactory(MongoState.Options options) { + this.options = options; + } + + @Override + public State makeState(Map conf, IMetricsContext metrics, + int partitionIndex, int numPartitions) { + MongoState state = new MongoState(conf, options); + state.prepare(); + return state; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java new file mode 100644 index 0000000..3173f6c --- /dev/null +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.util.List; + +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; + +public class MongoStateUpdater extends BaseStateUpdater<MongoState> { + + @Override + public void updateState(MongoState state, List<TridentTuple> tuples, + TridentCollector collector) { + state.updateState(tuples, collector); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java new file mode 100644 index 0000000..c83bdbd --- /dev/null +++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/InsertWordCount.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.topology; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.mongodb.bolt.MongoInsertBolt; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper; + +import java.util.HashMap; +import java.util.Map; + +public class InsertWordCount { + private static final String WORD_SPOUT = "WORD_SPOUT"; + private static final String COUNT_BOLT = "COUNT_BOLT"; + private static final String INSERT_BOLT = "INSERT_BOLT"; + + private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test"; + private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount"; + + + public static void main(String[] args) throws Exception { + Config config = new Config(); + + String url = TEST_MONGODB_URL; + String collectionName = TEST_MONGODB_COLLECTION_NAME; + + if (args.length >= 2) { + url = args[0]; + collectionName = args[1]; + } + + WordSpout spout = new WordSpout(); + WordCounter bolt = new WordCounter(); + + MongoMapper mapper = new SimpleMongoMapper() + .withFields("word", "count"); + + MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper); + + // wordSpout ==> countBolt ==> MongoInsertBolt + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(WORD_SPOUT, spout, 1); + builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); + builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); + + + if (args.length == 2) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, builder.createTopology()); + Thread.sleep(30000); + cluster.killTopology("test"); + cluster.shutdown(); + System.exit(0); + } else if (args.length == 3) { + StormSubmitter.submitTopology(args[2], config, builder.createTopology()); + } else{ + System.out.println("Usage: InsertWordCount <mongodb url> <mongodb collection> [topology name]"); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java new file mode 100644 index 0000000..071708e --- /dev/null +++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/UpdateWordCount.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.topology; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.mongodb.bolt.MongoInsertBolt; +import org.apache.storm.mongodb.bolt.MongoUpdateBolt; +import org.apache.storm.mongodb.common.QueryFilterCreator; +import org.apache.storm.mongodb.common.SimpleQueryFilterCreator; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper; +import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper; + +import java.util.HashMap; +import java.util.Map; + +public class UpdateWordCount { + private static final String WORD_SPOUT = "WORD_SPOUT"; + private static final String COUNT_BOLT = "COUNT_BOLT"; + private static final String UPDATE_BOLT = "UPDATE_BOLT"; + + private static final String TEST_MONGODB_URL = "mongodb://127.0.0.1:27017/test"; + private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount"; + + + public static void main(String[] args) throws Exception { + Config config = new Config(); + + String url = TEST_MONGODB_URL; + String collectionName = TEST_MONGODB_COLLECTION_NAME; + + if (args.length >= 2) { + url = args[0]; + collectionName = args[1]; + } + + WordSpout spout = new WordSpout(); + WordCounter bolt = new WordCounter(); + + MongoMapper mapper = new SimpleMongoUpdateMapper() + .withFields("word", "count"); + + QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator() + .withField("word"); + + MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator , mapper); + + //if a new document should be inserted if there are no matches to the query filter + //updateBolt.withUpsert(true); + + // wordSpout ==> countBolt ==> MongoUpdateBolt + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout(WORD_SPOUT, spout, 1); + builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); + builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); + + + if (args.length == 2) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, builder.createTopology()); + Thread.sleep(30000); + cluster.killTopology("test"); + cluster.shutdown(); + System.exit(0); + } else if (args.length == 3) { + StormSubmitter.submitTopology(args[2], config, builder.createTopology()); + } else{ + System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb collection> [topology name]"); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java new file mode 100644 index 0000000..481f959 --- /dev/null +++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordCounter.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.topology; + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.IBasicBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import com.google.common.collect.Maps; + +import java.util.Map; + +import static org.apache.storm.utils.Utils.tuple; + +public class WordCounter implements IBasicBolt { + private Map<String, Integer> wordCounter = Maps.newHashMap(); + + public void prepare(Map stormConf, TopologyContext context) { + + } + + public void execute(Tuple input, BasicOutputCollector collector) { + String word = input.getStringByField("word"); + int count; + if (wordCounter.containsKey(word)) { + count = wordCounter.get(word) + 1; + wordCounter.put(word, wordCounter.get(word) + 1); + } else { + count = 1; + } + + wordCounter.put(word, count); + collector.emit(new Values(word, String.valueOf(count))); + } + + public void cleanup() { + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java new file mode 100644 index 0000000..284f228 --- /dev/null +++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/topology/WordSpout.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.topology; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +public class WordSpout implements IRichSpout { + boolean isDistributed; + SpoutOutputCollector collector; + public static final String[] words = new String[] { "apple", "orange", "pineapple", "banana", "watermelon" }; + + public WordSpout() { + this(true); + } + + public WordSpout(boolean isDistributed) { + this.isDistributed = isDistributed; + } + + public boolean isDistributed() { + return this.isDistributed; + } + + @SuppressWarnings("rawtypes") + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + public void close() { + + } + + public void nextTuple() { + final Random rand = new Random(); + final String word = words[rand.nextInt(words.length)]; + this.collector.emit(new Values(word), UUID.randomUUID()); + Thread.yield(); + } + + public void ack(Object msgId) { + + } + + public void fail(Object msgId) { + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public void activate() { + } + + @Override + public void deactivate() { + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java new file mode 100644 index 0000000..7a18863 --- /dev/null +++ b/external/storm-mongodb/src/test/java/org/apache/storm/mongodb/trident/WordCountTrident.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper; +import org.apache.storm.mongodb.trident.state.MongoState; +import org.apache.storm.mongodb.trident.state.MongoStateFactory; +import org.apache.storm.mongodb.trident.state.MongoStateUpdater; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class WordCountTrident { + + public static StormTopology buildTopology(String url, String collectionName){ + Fields fields = new Fields("word", "count"); + FixedBatchSpout spout = new FixedBatchSpout(fields, 4, + new Values("storm", 1), + new Values("trident", 1), + new Values("needs", 1), + new Values("javadoc", 1) + ); + spout.setCycle(true); + + MongoMapper mapper = new SimpleMongoMapper() + .withFields("word", "count"); + + MongoState.Options options = new MongoState.Options() + .withUrl(url) + .withCollectionName(collectionName) + .withMapper(mapper); + + StateFactory factory = new MongoStateFactory(options); + + TridentTopology topology = new TridentTopology(); + Stream stream = topology.newStream("spout1", spout); + + stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields()); + return topology.build(); + } + + public static void main(String[] args) throws Exception { + Config conf = new Config(); + conf.setMaxSpoutPending(5); + if (args.length == 2) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1])); + Thread.sleep(60 * 1000); + cluster.killTopology("wordCounter"); + cluster.shutdown(); + System.exit(0); + } + else if(args.length == 3) { + conf.setNumWorkers(3); + StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1])); + } else{ + System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]"); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index facd824..213e5ab 100644 --- a/pom.xml +++ b/pom.xml @@ -277,6 +277,7 @@ <module>external/storm-metrics</module> <module>external/storm-cassandra</module> <module>external/storm-mqtt</module> + <module>external/storm-mongodb</module> <module>examples/storm-starter</module> </modules> http://git-wip-us.apache.org/repos/asf/storm/blob/45fe4595/storm-dist/binary/src/main/assembly/binary.xml ---------------------------------------------------------------------- diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml index 6d40c19..9332283 100644 --- a/storm-dist/binary/src/main/assembly/binary.xml +++ b/storm-dist/binary/src/main/assembly/binary.xml @@ -303,6 +303,20 @@ <include>storm*jar</include> </includes> </fileSet> + <fileSet> + <directory>${project.basedir}/../../external/storm-mongodb/target</directory> + <outputDirectory>external/storm-mongodb</outputDirectory> + <includes> + <include>storm*jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.basedir}/../../external/storm-mongodb</directory> + <outputDirectory>external/storm-mongodb</outputDirectory> + <includes> + <include>README.*</include> + </includes> + </fileSet> <!-- $STORM_HOME/extlib --> <fileSet>
