Repository: storm Updated Branches: refs/heads/master 2f1411da3 -> 98dbdcdb5
STORM-1573: Add batch support for MongoInsertBolt Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78f17061 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78f17061 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78f17061 Branch: refs/heads/master Commit: 78f17061f7af82e82b011dcadff0829cad202227 Parents: 50701df Author: vesense <[email protected]> Authored: Thu Mar 17 20:51:51 2016 +0800 Committer: vesense <[email protected]> Committed: Thu Mar 31 12:45:59 2016 +0800 ---------------------------------------------------------------------- .../storm/mongodb/bolt/MongoInsertBolt.java | 74 ++++++++++++++++++-- .../storm/mongodb/bolt/MongoUpdateBolt.java | 3 +- .../storm/mongodb/common/MongoDBClient.java | 20 +++--- .../storm/mongodb/trident/state/MongoState.java | 2 +- 4 files changed, 80 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java index 26cd150..a030a6c 100644 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java @@ -17,11 +17,18 @@ */ package org.apache.storm.mongodb.bolt; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + import org.apache.commons.lang.Validate; import org.apache.storm.mongodb.common.mapper.MongoMapper; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Basic bolt for writing to MongoDB. @@ -30,30 +37,87 @@ import org.bson.Document; * */ public class MongoInsertBolt extends AbstractMongoBolt { + private static final Logger LOG = LoggerFactory.getLogger(MongoInsertBolt.class); + + private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1; private MongoMapper mapper; + private boolean ordered = true; //default is ordered. + + private int batchSize = 15000; + + private List<Tuple> tupleBatch; + + private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS; + public MongoInsertBolt(String url, String collectionName, MongoMapper mapper) { super(url, collectionName); Validate.notNull(mapper, "MongoMapper can not be null"); this.mapper = mapper; + + this.tupleBatch = new LinkedList<>(); } @Override public void execute(Tuple tuple) { + boolean forceFlush = false; try{ - //get document - Document doc = mapper.toDocument(tuple); - mongoClient.insert(doc); - this.collector.ack(tuple); + if (TupleUtils.isTick(tuple)) { + LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize); + collector.ack(tuple); + forceFlush = true; + } else { + tupleBatch.add(tuple); + if (tupleBatch.size() >= batchSize) { + forceFlush = true; + } + } + + if(forceFlush && !tupleBatch.isEmpty()) { + List<Document> docs = new LinkedList<>(); + for (Tuple t : tupleBatch) { + Document doc = mapper.toDocument(t); + docs.add(doc); + } + mongoClient.insert(docs, ordered); + + for(Tuple t : tupleBatch) { + collector.ack(t); + } + tupleBatch.clear(); + } } catch (Exception e) { this.collector.reportError(e); - this.collector.fail(tuple); + for (Tuple t : tupleBatch) { + collector.fail(t); + } + tupleBatch.clear(); } } + public MongoInsertBolt withBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public MongoInsertBolt withOrdered(boolean ordered) { + this.ordered = ordered; + return this; + } + + public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) { + this.flushIntervalSecs = flushIntervalSecs; + return this; + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), flushIntervalSecs); + } + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java index 1994993..510a3d0 100644 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java @@ -63,8 +63,9 @@ public class MongoUpdateBolt extends AbstractMongoBolt { } } - public void withUpsert(boolean upsert) { + public MongoUpdateBolt withUpsert(boolean upsert) { this.upsert = upsert; + return this; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java index be2e376..cb4c454 100644 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java @@ -26,6 +26,7 @@ import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertManyOptions; import com.mongodb.client.model.UpdateOptions; public class MongoDBClient { @@ -45,15 +46,6 @@ public class MongoDBClient { } /** - * Inserts the provided document. - * - * @param document - */ - public void insert(Document document) { - collection.insertOne(document); - } - - /** * Inserts one or more documents. * This method is equivalent to a call to the bulkWrite method. * The documents will be inserted in the order provided, @@ -61,8 +53,12 @@ public class MongoDBClient { * * @param documents */ - public void insert(List<Document> documents) { - collection.insertMany(documents); + public void insert(List<Document> documents, boolean ordered) { + InsertManyOptions options = new InsertManyOptions(); + if (!ordered) { + options.ordered(false); + } + collection.insertMany(documents, options); } /** @@ -75,7 +71,7 @@ public class MongoDBClient { */ public void update(Bson filter, Bson update, boolean upsert) { UpdateOptions options = new UpdateOptions(); - if(upsert) { + if (upsert) { options.upsert(true); } collection.updateMany(filter, update, options); http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java ---------------------------------------------------------------------- diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java index 843fcee..112b170 100644 --- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java +++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java @@ -91,7 +91,7 @@ public class MongoState implements State { Document document = options.mapper.toDocument(tuple); documents.add(document); } - this.mongoClient.insert(documents); + this.mongoClient.insert(documents, true); } }
