Repository: storm
Updated Branches:
  refs/heads/master 2f1411da3 -> 98dbdcdb5


STORM-1573: Add batch support for MongoInsertBolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/78f17061
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/78f17061
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/78f17061

Branch: refs/heads/master
Commit: 78f17061f7af82e82b011dcadff0829cad202227
Parents: 50701df
Author: vesense <[email protected]>
Authored: Thu Mar 17 20:51:51 2016 +0800
Committer: vesense <[email protected]>
Committed: Thu Mar 31 12:45:59 2016 +0800

----------------------------------------------------------------------
 .../storm/mongodb/bolt/MongoInsertBolt.java     | 74 ++++++++++++++++++--
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |  3 +-
 .../storm/mongodb/common/MongoDBClient.java     | 20 +++---
 .../storm/mongodb/trident/state/MongoState.java |  2 +-
 4 files changed, 80 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
index 26cd150..a030a6c 100644
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
@@ -17,11 +17,18 @@
  */
 package org.apache.storm.mongodb.bolt;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.lang.Validate;
 import org.apache.storm.mongodb.common.mapper.MongoMapper;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
 import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Basic bolt for writing to MongoDB.
@@ -30,30 +37,87 @@ import org.bson.Document;
  *
  */
 public class MongoInsertBolt extends AbstractMongoBolt {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoInsertBolt.class);
+
+    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
 
     private MongoMapper mapper;
 
+    private boolean ordered = true;  //default is ordered.
+
+    private int batchSize = 15000;
+
+    private List<Tuple> tupleBatch;
+
+    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
+
     public MongoInsertBolt(String url, String collectionName, MongoMapper 
mapper) {
         super(url, collectionName);
 
         Validate.notNull(mapper, "MongoMapper can not be null");
 
         this.mapper = mapper;
+
+        this.tupleBatch = new LinkedList<>();
     }
 
     @Override
     public void execute(Tuple tuple) {
+        boolean forceFlush = false;
         try{
-            //get document
-            Document doc = mapper.toDocument(tuple);
-            mongoClient.insert(doc);
-            this.collector.ack(tuple);
+            if (TupleUtils.isTick(tuple)) {
+                LOG.debug("TICK received! current batch status [{}/{}]", 
tupleBatch.size(), batchSize);
+                collector.ack(tuple);
+                forceFlush = true;
+            } else {
+                tupleBatch.add(tuple);
+                if (tupleBatch.size() >= batchSize) {
+                    forceFlush = true;
+                }
+            }
+
+            if(forceFlush && !tupleBatch.isEmpty()) {
+                List<Document> docs = new LinkedList<>();
+                for (Tuple t : tupleBatch) {
+                    Document doc = mapper.toDocument(t);
+                    docs.add(doc);
+                }
+                mongoClient.insert(docs, ordered);
+
+                for(Tuple t : tupleBatch) {
+                    collector.ack(t);
+                }
+                tupleBatch.clear();
+            }
         } catch (Exception e) {
             this.collector.reportError(e);
-            this.collector.fail(tuple);
+            for (Tuple t : tupleBatch) {
+                collector.fail(t);
+            }
+            tupleBatch.clear();
         }
     }
 
+    public MongoInsertBolt withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    public MongoInsertBolt withOrdered(boolean ordered) {
+        this.ordered = ordered;
+        return this;
+    }
+
+    public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
+        this.flushIntervalSecs = flushIntervalSecs;
+        return this;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return 
TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(),
 flushIntervalSecs);
+    }
+
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         

http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
index 1994993..510a3d0 100644
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
@@ -63,8 +63,9 @@ public class MongoUpdateBolt extends AbstractMongoBolt {
         }
     }
 
-    public void withUpsert(boolean upsert) {
+    public MongoUpdateBolt withUpsert(boolean upsert) {
         this.upsert = upsert;
+        return this;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
index be2e376..cb4c454 100644
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
+++ 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java
@@ -26,6 +26,7 @@ import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertManyOptions;
 import com.mongodb.client.model.UpdateOptions;
 
 public class MongoDBClient {
@@ -45,15 +46,6 @@ public class MongoDBClient {
     }
 
     /**
-     * Inserts the provided document.
-     * 
-     * @param document
-     */
-    public void insert(Document document) {
-        collection.insertOne(document);
-    }
-
-    /**
      * Inserts one or more documents.
      * This method is equivalent to a call to the bulkWrite method.
      * The documents will be inserted in the order provided, 
@@ -61,8 +53,12 @@ public class MongoDBClient {
      * 
      * @param documents
      */
-    public void insert(List<Document> documents) {
-        collection.insertMany(documents);
+    public void insert(List<Document> documents, boolean ordered) {
+        InsertManyOptions options = new InsertManyOptions();
+        if (!ordered) {
+            options.ordered(false);
+        }
+        collection.insertMany(documents, options);
     }
 
     /**
@@ -75,7 +71,7 @@ public class MongoDBClient {
      */
     public void update(Bson filter, Bson update, boolean upsert) {
         UpdateOptions options = new UpdateOptions();
-        if(upsert) {
+        if (upsert) {
             options.upsert(true);
         }
         collection.updateMany(filter, update, options);

http://git-wip-us.apache.org/repos/asf/storm/blob/78f17061/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
index 843fcee..112b170 100644
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
@@ -91,7 +91,7 @@ public class MongoState implements State {
             Document document = options.mapper.toDocument(tuple);
             documents.add(document);
         }
-        this.mongoClient.insert(documents);
+        this.mongoClient.insert(documents, true);
     }
 
 }

Reply via email to