This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch STORM-3988
in repository https://gitbox.apache.org/repos/asf/storm.git

commit ee7d71c5ec4bd36dcc86cb1edf436d423180c57f
Author: Richard Zowalla <[email protected]>
AuthorDate: Thu Oct 19 08:58:34 2023 +0200

    STORM-3988 - Remove "storm-mongodb"
---
 examples/storm-mongodb-examples/pom.xml            |  96 ------
 .../storm/mongodb/topology/InsertWordCount.java    |  72 -----
 .../storm/mongodb/topology/LookupWordCount.java    |  76 -----
 .../storm/mongodb/topology/TotalWordCounter.java   |  74 -----
 .../storm/mongodb/topology/UpdateWordCount.java    |  83 ------
 .../apache/storm/mongodb/topology/WordCounter.java |  71 -----
 .../apache/storm/mongodb/topology/WordSpout.java   |  94 ------
 .../storm/mongodb/trident/PrintFunction.java       |  41 ---
 .../storm/mongodb/trident/WordCountTrident.java    |  87 ------
 .../storm/mongodb/trident/WordCountTridentMap.java |  89 ------
 external/storm-mongodb/README.md                   | 325 ---------------------
 external/storm-mongodb/pom.xml                     |  82 ------
 .../storm/mongodb/bolt/AbstractMongoBolt.java      |  62 ----
 .../apache/storm/mongodb/bolt/MongoInsertBolt.java | 124 --------
 .../apache/storm/mongodb/bolt/MongoLookupBolt.java |  86 ------
 .../apache/storm/mongodb/bolt/MongoUpdateBolt.java |  93 ------
 .../apache/storm/mongodb/common/MongoDbClient.java | 109 -------
 .../apache/storm/mongodb/common/MongoUtils.java    |  44 ---
 .../storm/mongodb/common/QueryFilterCreator.java   |  47 ---
 .../mongodb/common/SimpleQueryFilterCreator.java   |  47 ---
 .../mongodb/common/mapper/MongoLookupMapper.java   |  45 ---
 .../storm/mongodb/common/mapper/MongoMapper.java   |  47 ---
 .../mongodb/common/mapper/MongoUpdateMapper.java   |  26 --
 .../common/mapper/SimpleMongoLookupMapper.java     |  64 ----
 .../mongodb/common/mapper/SimpleMongoMapper.java   |  55 ----
 .../common/mapper/SimpleMongoUpdateMapper.java     |  47 ---
 .../storm/mongodb/trident/state/MongoMapState.java | 213 --------------
 .../storm/mongodb/trident/state/MongoState.java    | 145 ---------
 .../mongodb/trident/state/MongoStateFactory.java   |  43 ---
 .../mongodb/trident/state/MongoStateQuery.java     |  41 ---
 .../mongodb/trident/state/MongoStateUpdater.java   |  35 ---
 pom.xml                                            |   2 -
 32 files changed, 2565 deletions(-)

diff --git a/examples/storm-mongodb-examples/pom.xml 
b/examples/storm-mongodb-examples/pom.xml
deleted file mode 100644
index f7b0b78b3..000000000
--- a/examples/storm-mongodb-examples/pom.xml
+++ /dev/null
@@ -1,96 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.6.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-mongodb-examples</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-mongodb</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <configuration>
-                    
<createDependencyReducedPom>true</createDependencyReducedPom>
-                    <filters>
-                        <filter>
-                            <artifact>*:*</artifact>
-                            <excludes>
-                                <exclude>META-INF/*.SF</exclude>
-                                <exclude>META-INF/*.sf</exclude>
-                                <exclude>META-INF/*.DSA</exclude>
-                                <exclude>META-INF/*.dsa</exclude>
-                                <exclude>META-INF/*.RSA</exclude>
-                                <exclude>META-INF/*.rsa</exclude>
-                                <exclude>META-INF/*.EC</exclude>
-                                <exclude>META-INF/*.ec</exclude>
-                                <exclude>META-INF/MSFTSIG.SF</exclude>
-                                <exclude>META-INF/MSFTSIG.RSA</exclude>
-                            </excludes>
-                        </filter>
-                    </filters>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <transformers>
-                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
-                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                </transformer>
-                            </transformers>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-pmd-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
deleted file mode 100644
index f877fca07..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.topology;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.mongodb.bolt.MongoInsertBolt;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
-public class InsertWordCount {
-    private static final String WORD_SPOUT = "WORD_SPOUT";
-    private static final String COUNT_BOLT = "COUNT_BOLT";
-    private static final String INSERT_BOLT = "INSERT_BOLT";
-
-    private static final String TEST_MONGODB_URL = 
"mongodb://127.0.0.1:27017/test";
-    private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
-    
-
-    public static void main(String[] args) throws Exception {
-        String url = TEST_MONGODB_URL;
-        String collectionName = TEST_MONGODB_COLLECTION_NAME;
-
-        if (args.length >= 2) {
-            url = args[0];
-            collectionName = args[1];
-        }
-
-        WordSpout spout = new WordSpout();
-        WordCounter bolt = new WordCounter();
-
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-        
-        MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, 
mapper);
-
-        // wordSpout ==> countBolt ==> MongoInsertBolt
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
-        builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, 
new Fields("word"));
-
-        String topoName = "test";
-        if (args.length == 3) {
-            topoName = args[2];
-        } else if (args.length > 3) {
-            System.out.println("Usage: InsertWordCount <mongodb url> <mongodb 
collection> [topology name]");
-            return;
-        }
-        Config config = new Config();
-        StormSubmitter.submitTopology(topoName, config, 
builder.createTopology());
-    }
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
deleted file mode 100644
index 6aaec4208..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.topology;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.mongodb.bolt.MongoLookupBolt;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoLookupMapper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
-public class LookupWordCount {
-    private static final String WORD_SPOUT = "WORD_SPOUT";
-    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
-    private static final String TOTAL_COUNT_BOLT = "TOTAL_COUNT_BOLT";
-
-    private static final String TEST_MONGODB_URL = 
"mongodb://127.0.0.1:27017/test";
-    private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
-
-    public static void main(String[] args) throws Exception {
-        String url = TEST_MONGODB_URL;
-        String collectionName = TEST_MONGODB_COLLECTION_NAME;
-
-        if (args.length >= 2) {
-            url = args[0];
-            collectionName = args[1];
-        }
-
-        WordSpout spout = new WordSpout();
-        TotalWordCounter totalBolt = new TotalWordCounter();
-
-        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-
-        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, 
filterCreator, mapper);
-
-        //wordspout -> lookupbolt -> totalCountBolt
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(LOOKUP_BOLT, lookupBolt, 
1).shuffleGrouping(WORD_SPOUT);
-        builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 
1).fieldsGrouping(LOOKUP_BOLT, new Fields("word"));
-
-        String topoName = "test";
-        if (args.length == 3) {
-            topoName = args[2];
-        } else if (args.length > 3) {
-            System.out.println("Usage: LookupWordCount <mongodb url> <mongodb 
collection> [topology name]");
-            return;
-        }
-
-        Config config = new Config();
-        StormSubmitter.submitTopology(topoName, config, 
builder.createTopology());
-    }
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
deleted file mode 100644
index 148f2f914..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/TotalWordCounter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.topology;
-
-import static org.apache.storm.utils.Utils.tuple;
-
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TotalWordCounter implements IBasicBolt {
-
-    private BigInteger total = BigInteger.ZERO;
-    private static final Logger LOG = 
LoggerFactory.getLogger(TotalWordCounter.class);
-    private static final Random RANDOM = new Random();
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context) 
{
-    }
-
-    /*
-     * Just output the word value with a count of 1.
-     */
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        total = total.add(new BigInteger(input.getValues().get(1).toString()));
-        collector.emit(tuple(total.toString()));
-        //prints the total with low probability.
-        if (RANDOM.nextInt(1000) > 995) {
-            LOG.info("Running total = " + total);
-        }
-    }
-
-    @Override
-    public void cleanup() {
-        LOG.info("Final total = " + total);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("total"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
deleted file mode 100644
index 54205eab0..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.topology;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
-public class UpdateWordCount {
-    private static final String WORD_SPOUT = "WORD_SPOUT";
-    private static final String COUNT_BOLT = "COUNT_BOLT";
-    private static final String UPDATE_BOLT = "UPDATE_BOLT";
-
-    private static final String TEST_MONGODB_URL = 
"mongodb://127.0.0.1:27017/test";
-    private static final String TEST_MONGODB_COLLECTION_NAME = "wordcount";
-    
-
-    public static void main(String[] args) throws Exception {
-        String url = TEST_MONGODB_URL;
-        String collectionName = TEST_MONGODB_COLLECTION_NAME;
-
-        if (args.length >= 2) {
-            url = args[0];
-            collectionName = args[1];
-        }
-
-        WordSpout spout = new WordSpout();
-        WordCounter bolt = new WordCounter();
-
-        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-        
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, 
filterCreator, mapper);
-
-        //if a new document should be inserted if there are no matches to the 
query filter
-        //updateBolt.withUpsert(true);
-
-        //whether find all documents according to the query filter
-        //updateBolt.withMany(true);
-
-        // wordSpout ==> countBolt ==> MongoUpdateBolt
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout(WORD_SPOUT, spout, 1);
-        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
-        builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, 
new Fields("word"));
-
-        String topoName = "test";
-        if (args.length == 3) {
-            topoName = args[2];
-        } else if (args.length > 3) {
-            System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb 
collection> [topology name]");
-            return;
-        }
-        Config config = new Config();
-        StormSubmitter.submitTopology(topoName, config, 
builder.createTopology());
-    }
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
deleted file mode 100644
index 05c04bc29..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordCounter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.topology;
-
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.BasicOutputCollector;
-import org.apache.storm.topology.IBasicBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-
-public class WordCounter implements IBasicBolt {
-    private Map<String, Integer> wordCounter = Maps.newHashMap();
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context) 
{
-        
-    }
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        String word = input.getStringByField("word");
-        int count;
-        if (wordCounter.containsKey(word)) {
-            count = wordCounter.get(word) + 1;
-            wordCounter.put(word, wordCounter.get(word) + 1);
-        } else {
-            count = 1;
-        }
-
-        wordCounter.put(word, count);
-        collector.emit(new Values(word, String.valueOf(count)));
-    }
-
-    @Override
-    public void cleanup() {
-
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("word", "count"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
deleted file mode 100644
index 96cab5e2a..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/WordSpout.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.topology;
-
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class WordSpout implements IRichSpout {
-    boolean isDistributed;
-    SpoutOutputCollector collector;
-    public static final String[] words = new String[] { "apple", "orange", 
"pineapple", "banana", "watermelon" };
-
-    public WordSpout() {
-        this(true);
-    }
-
-    public WordSpout(boolean isDistributed) {
-        this.isDistributed = isDistributed;
-    }
-
-    public boolean isDistributed() {
-        return this.isDistributed;
-    }
-
-    @Override
-    public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public void nextTuple() {
-        final Random rand = new Random();
-        final String word = words[rand.nextInt(words.length)];
-        this.collector.emit(new Values(word), UUID.randomUUID());
-        Thread.yield();
-    }
-
-    @Override
-    public void ack(Object msgId) {
-
-    }
-
-    @Override
-    public void fail(Object msgId) {
-
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("word"));
-    }
-
-    @Override
-    public void activate() {
-    }
-
-    @Override
-    public void deactivate() {
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
deleted file mode 100644
index 66410f381..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/PrintFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident;
-
-import java.util.Random;
-
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PrintFunction extends BaseFunction {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PrintFunction.class);
-
-    private static final Random RANDOM = new Random();
-
-    @Override
-    public void execute(TridentTuple tuple, TridentCollector tridentCollector) 
{
-        if (RANDOM.nextInt(1000) > 995) {
-            LOG.info(tuple.toString());
-        }
-    }
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
deleted file mode 100644
index 858ea532c..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-import org.apache.storm.mongodb.trident.state.MongoState;
-import org.apache.storm.mongodb.trident.state.MongoStateFactory;
-import org.apache.storm.mongodb.trident.state.MongoStateQuery;
-import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class WordCountTrident {
-
-    public static StormTopology buildTopology(String url, String 
collectionName) {
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", 1),
-                new Values("trident", 1),
-                new Values("needs", 1),
-                new Values("javadoc", 1)
-        );
-        spout.setCycle(true);
-
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoState.Options options = new MongoState.Options()
-                .withUrl(url)
-                .withCollectionName(collectionName)
-                .withMapper(mapper);
-
-        StateFactory factory = new MongoStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,
-                new MongoStateUpdater(), new Fields());
-
-        TridentState state = topology.newStaticState(factory);
-        stream = stream.stateQuery(state, new Fields("word"),
-                new MongoStateQuery(), new Fields("columnName", 
"columnValue"));
-        stream.each(new Fields("word", "columnValue"), new PrintFunction(), 
new Fields());
-        return topology.build();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(5);
-        String topoName = "wordCounter";
-        if (args.length == 3) {
-            topoName = args[2];
-        } else if (args.length > 3 || args.length < 2) {
-            System.out.println("Usage: WordCountTrident <mongodb url> <mongodb 
collection> [topology name]");
-            return;
-        }
-        conf.setNumWorkers(3);
-        StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], 
args[1]));
-    }
-
-}
diff --git 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
 
b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
deleted file mode 100644
index 18f79dc07..000000000
--- 
a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-import org.apache.storm.mongodb.trident.state.MongoMapState;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentState;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.builtin.MapGet;
-import org.apache.storm.trident.operation.builtin.Sum;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class WordCountTridentMap {
-
-    public static StormTopology buildTopology(String url, String 
collectionName) {
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", 1),
-                new Values("trident", 1),
-                new Values("needs", 1),
-                new Values("javadoc", 1)
-        );
-        spout.setCycle(true);
-
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoMapState.Options options = new MongoMapState.Options();
-        options.url = url;
-        options.collectionName = collectionName;
-        options.mapper = mapper;
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-        options.queryCreator = filterCreator;
-
-        StateFactory factory = MongoMapState.transactional(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        TridentState state = stream.groupBy(new Fields("word"))
-                .persistentAggregate(factory, new Fields("count"), new Sum(), 
new Fields("sum"));
-
-        stream.stateQuery(state, new Fields("word"), new MapGet(), new 
Fields("sum"))
-                .each(new Fields("word", "sum"), new PrintFunction(), new 
Fields());
-        return topology.build();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(5);
-        String topoName = "wordCounter";
-        if (args.length == 3) {
-            topoName = args[2];
-        } else if (args.length > 3 || args.length < 2) {
-            System.out.println("Usage: WordCountTrident <mongodb url> <mongodb 
collection> [topology name]");
-            return;
-        }
-        conf.setNumWorkers(3);
-        StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], 
args[1]));
-    }
-
-}
diff --git a/external/storm-mongodb/README.md b/external/storm-mongodb/README.md
deleted file mode 100644
index 5c817107b..000000000
--- a/external/storm-mongodb/README.md
+++ /dev/null
@@ -1,325 +0,0 @@
-#Storm MongoDB
-
-Storm/Trident integration for [MongoDB](https://www.mongodb.org/). This 
package includes the core bolts and trident states that allows a storm topology 
to either insert storm tuples in a database collection or to execute 
select/update queries against a database collection in a storm topology.
-
-## Insert into Database
-The bolt and trident state included in this package for inserting data into a 
database collection.
-
-### MongoMapper
-The main API for inserting data in a collection using MongoDB is the 
`org.apache.storm.mongodb.common.mapper.MongoMapper` interface:
-
-```java
-public interface MongoMapper extends Serializable {
-    Document toDocument(ITuple tuple);
-    Document toDocumentByKeys(List<Object> keys);
-}
-```
-
-### SimpleMongoMapper
-`storm-mongodb` includes a general purpose `MongoMapper` implementation called 
`SimpleMongoMapper` that can map Storm tuple to a Database document.  
`SimpleMongoMapper` assumes that the storm tuple has fields with same name as 
the document field name in the database collection that you intend to write to.
-
-```java
-public class SimpleMongoMapper implements MongoMapper {
-    private String[] fields;
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for(String field : fields){
-            document.append(field, tuple.getValueByField(field));
-        }
-        return document;
-    }
-
-    @Override
-    public Document toDocumentByKeys(List<Object> keys) {
-        Document document = new Document();
-        document.append("_id", MongoUtils.getID(keys));
-        return document;
-    }
-
-    public SimpleMongoMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
-```
-
-### MongoInsertBolt
-To use the `MongoInsertBolt`, you construct an instance of it by specifying 
url, collectionName and a `MongoMapper` implementation that converts storm 
tuple to DB document. The following is the standard URI connection scheme:
- 
`mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
-
-More options information(eg: Write Concern Options) about Mongo URI, you can 
visit 
https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options
-
- ```java
-String url = "mongodb://127.0.0.1:27017/test";
-String collectionName = "wordcount";
-
-MongoMapper mapper = new SimpleMongoMapper()
-        .withFields("word", "count");
-
-MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
- ```
-
-
-## Update from Database
-The bolt included in this package for updating data from a database collection.
-
-### MongoUpdateMapper
-The main API for updating data in a collection using MongoDB is the 
`org.apache.storm.mongodb.common.mapper.MongoUpdateMapper` interface:
-
-```java
-public interface MongoUpdateMapper extends MongoMapper { }
-```
-
-### SimpleMongoUpdateMapper
-`storm-mongodb` includes a general purpose `MongoUpdateMapper` implementation 
called `SimpleMongoUpdateMapper` that can map Storm tuple to a Database 
document. `SimpleMongoUpdateMapper` assumes that the storm tuple has fields 
with same name as the document field name in the database collection that you 
intend to write to.
-`SimpleMongoUpdateMapper` uses `$set` operator for setting the value of a 
field in a document. More information about update operator, you can visit 
-https://docs.mongodb.org/manual/reference/operator/update/
-
-```java
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements 
MongoUpdateMapper {
-
-    private String[] fields;
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for(String field : fields){
-            document.append(field, tuple.getValueByField(field));
-        }
-        //$set operator: Sets the value of a field in a document.
-        return new Document("$set", document);
-    }
-
-    public SimpleMongoUpdateMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
-```
-
-
-### QueryFilterCreator
-The main API for creating a MongoDB query Filter is the 
`org.apache.storm.mongodb.common.QueryFilterCreator` interface:
-
- ```java
-public interface QueryFilterCreator extends Serializable {
-    Bson createFilter(ITuple tuple);
-    Bson createFilterByKeys(List<Object> keys);
-}
- ```
-
-### SimpleQueryFilterCreator
-`storm-mongodb` includes a general purpose `QueryFilterCreator` implementation 
called `SimpleQueryFilterCreator` that can create a MongoDB query Filter by 
given Tuple.  `QueryFilterCreator` uses `$eq` operator for matching values that 
are equal to a specified value. More information about query operator, you can 
visit 
-https://docs.mongodb.org/manual/reference/operator/query/
-
- ```java
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
-    private String field;
-    
-    @Override
-    public Bson createFilter(ITuple tuple) {
-        return Filters.eq(field, tuple.getValueByField(field));
-    }
-
-    @Override
-    public Bson createFilterByKeys(List<Object> keys) {
-        return Filters.eq("_id", MongoUtils.getID(keys));
-    }
-
-    public SimpleQueryFilterCreator withField(String field) {
-        this.field = field;
-        return this;
-    }
-
-}
- ```
-
-### MongoUpdateBolt
-To use the `MongoUpdateBolt`,  you construct an instance of it by specifying 
Mongo url, collectionName, a `QueryFilterCreator` implementation and a 
`MongoUpdateMapper` implementation that converts storm tuple to DB document.
-
- ```java
-        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-        
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, 
updateQueryCreator, mapper);
-
-        //if a new document should be inserted if there are no matches to the 
query filter
-        //updateBolt.withUpsert(true);
-
-        //whether find all documents according to the query filter
-        //updateBolt.withMany(true);
- ```
- 
- Or use a anonymous inner class implementation for `QueryFilterCreator`:
- 
-  ```java
-        MongoUpdateMapper mapper = new SimpleMongoUpdateMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
-            @Override
-            public Bson createFilter(ITuple tuple) {
-                return Filters.gt("count", 3);
-            }
-        };
-        
-        MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, 
updateQueryCreator, mapper);
-
-        //if a new document should be inserted if there are no matches to the 
query filter
-        //updateBolt.withUpsert(true);
- ```
-
-
-## Lookup from Database
-The bolt included in this package for selecting data from a database 
collection.
-
-### MongoLookupMapper
-The main API for selecting data in a collection using MongoDB is the 
`org.apache.storm.mongodb.common.mapper.MongoLookupMapper` interface:
-
-```java
-public interface MongoLookupMapper extends Serializable {
-
-    List<Values> toTuple(ITuple input, Document doc);
-
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-}
-```
-
-### SimpleMongoLookupMapper
-`storm-mongodb` includes a general purpose `MongoLookupMapper` implementation 
called `SimpleMongoLookupMapper` that can converts a Mongo document to a list 
of storm values.
-
-```java
-public class SimpleMongoLookupMapper implements MongoLookupMapper {
-
-    private String[] fields;
-
-    @Override
-    public List<Values> toTuple(ITuple input, Document doc) {
-        Values values = new Values();
-
-        for(String field : fields) {
-            if(input.contains(field)) {
-                values.add(input.getValueByField(field));
-            } else {
-                values.add(doc.get(field));
-            }
-        }
-        List<Values> result = new ArrayList<Values>();
-        result.add(values);
-        return result;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(fields));
-    }
-
-    public SimpleMongoLookupMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-
-}
-```
-
-### MongoLookupBolt
-To use the `MongoLookupBolt`,  you construct an instance of it by specifying 
Mongo url, collectionName, a `QueryFilterCreator` implementation and a 
`MongoLookupMapper` implementation that converts a Mongo document to a list of 
storm values.
-
- ```java
-        MongoLookupMapper mapper = new SimpleMongoLookupMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-
-        MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, 
filterCreator, mapper);
- ```
-
-## Mongo Trident State&MapState
-### Trident State
-We support trident persistent state that can be used with trident topologies. 
To create a Mongo persistent trident state you need to initialize it with the 
url, collectionName, the `MongoMapper` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        MongoState.Options options = new MongoState.Options()
-                .withUrl(url)
-                .withCollectionName(collectionName)
-                .withMapper(mapper);
-
-        StateFactory factory = new MongoStateFactory(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        stream.partitionPersist(factory, fields,
-                new MongoStateUpdater(), new Fields());
-
-        TridentState state = topology.newStaticState(factory);
-        stream = stream.stateQuery(state, new Fields("word"),
-                new MongoStateQuery(), new Fields("columnName", 
"columnValue"));
-        stream.each(new Fields("word", "columnValue"), new PrintFunction(), 
new Fields());
- ```
- **NOTE**:
- >If there is no unique index provided, trident state inserts in the case of 
failures may result in duplicate documents.
-
-### Trident MapState
-We also support trident `MapState`. To create a Mongo trident `MapState` you 
need to initialize it with the url, collectionName, the `MongoMapper` and 
`QueryFilterCreator` instance. See the example below:
-
- ```java
-        MongoMapper mapper = new SimpleMongoMapper()
-                .withFields("word", "count");
-
-        QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
-                .withField("word");
-
-        MongoMapState.Options options = new MongoMapState.Options();
-        options.url = url;
-        options.collectionName = collectionName;
-        options.mapper = mapper;
-        options.queryCreator = filterCreator;
-
-        StateFactory factory = MongoMapState.transactional(options);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        TridentState state = stream.groupBy(new Fields("word"))
-                .persistentAggregate(factory, new Fields("count"), new Sum(), 
new Fields("sum"));
-
-        stream.stateQuery(state, new Fields("word"), new MapGet(), new 
Fields("sum"))
-                .each(new Fields("word", "sum"), new PrintFunction(), new 
Fields());
- ```
- 
-
-## License
-
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
-## Committer Sponsors
-
- * Sriharsha Chintalapani ([[email protected]](mailto:[email protected]))
- * Xin Wang ([[email protected]](mailto:[email protected]))
- 
diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml
deleted file mode 100644
index 66bf9fb9e..000000000
--- a/external/storm-mongodb/pom.xml
+++ /dev/null
@@ -1,82 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>2.6.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-
-    <artifactId>storm-mongodb</artifactId>
-
-    <developers>
-        <developer>
-            <id>vesense</id>
-            <name>Xin Wang</name>
-            <email>[email protected]</email>
-        </developer>
-    </developers>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-client</artifactId>
-            <version>${project.version}</version>
-            <scope>${provided.scope}</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mongodb</groupId>
-            <artifactId>mongo-java-driver</artifactId>
-            <version>${mongodb.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-        <!--test dependencies -->
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest</artifactId>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-checkstyle-plugin</artifactId>
-                <!--Note - the version would be inherited-->
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-pmd-plugin</artifactId>
-            </plugin>
-        </plugins>
-    </build>
-</project>
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
deleted file mode 100644
index 13049de8b..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
-
-public abstract class AbstractMongoBolt extends BaseRichBolt {
-
-    private String url;
-    private String collectionName;
-
-    protected OutputCollector collector;
-    protected MongoDbClient mongoClient;
-
-    /**
-     * AbstractMongoBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     */
-    public AbstractMongoBolt(String url, String collectionName) {
-        Validate.notEmpty(url, "url can not be blank or null");
-        Validate.notEmpty(collectionName, "collectionName can not be blank or 
null");
-
-        this.url = url;
-        this.collectionName = collectionName;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context,
-            OutputCollector collector) {
-        this.collector = collector;
-        this.mongoClient = new MongoDbClient(url, collectionName);
-    }
-
-    @Override
-    public void cleanup() {
-        this.mongoClient.close();
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
deleted file mode 100644
index 5d233dacb..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoInsertBolt.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.BatchHelper;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-
-/**
- * Basic bolt for writing to MongoDB.
- * Note: Each MongoInsertBolt defined in a topology is tied to a specific 
collection.
- */
-public class MongoInsertBolt extends AbstractMongoBolt {
-
-    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 1;
-
-    private MongoMapper mapper;
-
-    private boolean ordered = true;  //default is ordered.
-
-    private int batchSize;
-
-    private BatchHelper batchHelper;
-
-    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
-
-    /**
-     * MongoInsertBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     * @param mapper MongoMapper converting tuple to an MongoDB document
-     */
-    public MongoInsertBolt(String url, String collectionName, MongoMapper 
mapper) {
-        super(url, collectionName);
-
-        Validate.notNull(mapper, "MongoMapper can not be null");
-
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        try {
-            if (batchHelper.shouldHandle(tuple)) {
-                batchHelper.addBatch(tuple);
-            }
-
-            if (batchHelper.shouldFlush()) {
-                flushTuples();
-                batchHelper.ack();
-            }
-        } catch (Exception e) {
-            batchHelper.fail(e);
-        }
-    }
-
-    private void flushTuples() {
-        List<Document> docs = new LinkedList<>();
-        for (Tuple t : batchHelper.getBatchTuples()) {
-            Document doc = mapper.toDocument(t);
-            docs.add(doc);
-        }
-        mongoClient.insert(docs, ordered);
-    }
-
-    public MongoInsertBolt withBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-        return this;
-    }
-
-    public MongoInsertBolt withOrdered(boolean ordered) {
-        this.ordered = ordered;
-        return this;
-    }
-
-    public MongoInsertBolt withFlushIntervalSecs(int flushIntervalSecs) {
-        this.flushIntervalSecs = flushIntervalSecs;
-        return this;
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return 
TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(),
 flushIntervalSecs);
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context,
-            OutputCollector collector) {
-        super.prepare(topoConf, context, collector);
-        this.batchHelper = new BatchHelper(batchSize, collector);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
deleted file mode 100644
index 39e1512da..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoLookupBolt.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import java.util.List;
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-/**
- * Basic bolt for querying from MongoDB.
- * Note: Each MongoLookupBolt defined in a topology is tied to a specific 
collection.
- */
-public class MongoLookupBolt extends AbstractMongoBolt {
-
-    private QueryFilterCreator queryCreator;
-    private MongoLookupMapper mapper;
-
-    /**
-     * MongoLookupBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     * @param queryCreator QueryFilterCreator
-     * @param mapper MongoMapper converting tuple to an MongoDB document
-     */
-    public MongoLookupBolt(String url, String collectionName, 
QueryFilterCreator queryCreator, MongoLookupMapper mapper) {
-        super(url, collectionName);
-
-        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
-        Validate.notNull(mapper, "MongoLookupMapper can not be null");
-
-        this.queryCreator = queryCreator;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        if (TupleUtils.isTick(tuple)) {
-            return;
-        }
-
-        try {
-            //get query filter
-            Bson filter = queryCreator.createFilter(tuple);
-            //find document from mongodb
-            Document doc = mongoClient.find(filter);
-            //get storm values and emit
-            List<Values> valuesList = mapper.toTuple(tuple, doc);
-            for (Values values : valuesList) {
-                this.collector.emit(tuple, values);
-            }
-            this.collector.ack(tuple);
-        } catch (Exception e) {
-            this.collector.reportError(e);
-            this.collector.fail(tuple);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        mapper.declareOutputFields(declarer);
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
deleted file mode 100644
index 4f92b3254..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.bolt;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-/**
- * Basic bolt for updating from MongoDB.
- * Note: Each MongoUpdateBolt defined in a topology is tied to a specific 
collection.
- */
-public class MongoUpdateBolt extends AbstractMongoBolt {
-
-    private QueryFilterCreator queryCreator;
-    private MongoUpdateMapper mapper;
-
-    private boolean upsert;  //the default is false.
-    private boolean many;  //the default is false.
-
-    /**
-     * MongoUpdateBolt Constructor.
-     * @param url The MongoDB server url
-     * @param collectionName The collection where reading/writing data
-     * @param queryCreator QueryFilterCreator
-     * @param mapper MongoMapper converting tuple to an MongoDB document
-     */
-    public MongoUpdateBolt(String url, String collectionName, 
QueryFilterCreator queryCreator, MongoUpdateMapper mapper) {
-        super(url, collectionName);
-
-        Validate.notNull(queryCreator, "QueryFilterCreator can not be null");
-        Validate.notNull(mapper, "MongoUpdateMapper can not be null");
-
-        this.queryCreator = queryCreator;
-        this.mapper = mapper;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        if (TupleUtils.isTick(tuple)) {
-            return;
-        }
-
-        try {
-            //get document
-            Document doc = mapper.toDocument(tuple);
-            //get query filter
-            Bson filter = queryCreator.createFilter(tuple);
-            mongoClient.update(filter, doc, upsert, many);
-            this.collector.ack(tuple);
-        } catch (Exception e) {
-            this.collector.reportError(e);
-            this.collector.fail(tuple);
-        }
-    }
-
-    public MongoUpdateBolt withUpsert(boolean upsert) {
-        this.upsert = upsert;
-        return this;
-    }
-
-    public MongoUpdateBolt withMany(boolean many) {
-        this.many = many;
-        return this;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
deleted file mode 100644
index 52ed2373c..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDbClient.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.InsertManyOptions;
-import com.mongodb.client.model.UpdateOptions;
-
-import java.util.List;
-
-import org.bson.Document;
-import org.bson.conversions.Bson;
-
-public class MongoDbClient {
-
-    private MongoClient client;
-    private MongoCollection<Document> collection;
-
-    /**
-     * The MongoDbClient constructor.
-     * @param url The Mongo server url
-     * @param collectionName The Mongo collection to read/write data
-     */
-    public MongoDbClient(String url, String collectionName) {
-        //Creates a MongoURI from the given string.
-        MongoClientURI uri = new MongoClientURI(url);
-        //Creates a MongoClient described by a URI.
-        this.client = new MongoClient(uri);
-        //Gets a Database.
-        MongoDatabase db = client.getDatabase(uri.getDatabase());
-        //Gets a collection.
-        this.collection = db.getCollection(collectionName);
-    }
-
-    /**
-     * Inserts one or more documents.
-     * This method is equivalent to a call to the bulkWrite method.
-     * The documents will be inserted in the order provided, 
-     * stopping on the first failed insertion. 
-     * 
-     * @param documents documents
-     */
-    public void insert(List<Document> documents, boolean ordered) {
-        InsertManyOptions options = new InsertManyOptions();
-        if (!ordered) {
-            options.ordered(false);
-        }
-        collection.insertMany(documents, options);
-    }
-
-    /**
-     * Update a single or all documents in the collection according to the 
specified arguments.
-     * When upsert set to true, the new document will be inserted if there are 
no matches to the query filter.
-     * 
-     * @param filter Bson filter
-     * @param document Bson document
-     * @param upsert a new document should be inserted if there are no matches 
to the query filter
-     * @param many whether find all documents according to the query filter
-     */
-    public void update(Bson filter, Bson document, boolean upsert, boolean 
many) {
-        //TODO batch updating
-        UpdateOptions options = new UpdateOptions();
-        if (upsert) {
-            options.upsert(true);
-        }
-        if (many) {
-            collection.updateMany(filter, document, options);
-        } else {
-            collection.updateOne(filter, document, options);
-        }
-    }
-
-    /**
-     * Finds a single document in the collection according to the specified 
arguments.
-     *
-     * @param filter Bson filter
-     */
-    public Document find(Bson filter) {
-        //TODO batch finding
-        return collection.find(filter).first();
-    }
-
-    /**
-     * Closes all resources associated with this instance.
-     */
-    public void close() {
-        client.close();
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
deleted file mode 100644
index 3ddd4d940..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoUtils.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-public final class MongoUtils {
-
-    /**
-     * Create Mongo _id based on input keys.
-     * @param keys keys
-     * @return Mongo _id
-     */
-    public static byte[] getId(List<Object> keys) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try {
-            for (Object key : keys) {
-                bos.write(String.valueOf(key).getBytes());
-            }
-            bos.close();
-        } catch (IOException e) {
-            throw new RuntimeException("IOException creating Mongo document 
_id.", e);
-        }
-        return bos.toByteArray();
-    }
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
deleted file mode 100644
index 521678e57..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/QueryFilterCreator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-/**
- * Create a MongoDB query Filter by given Tuple/trident keys.
- */
-public interface QueryFilterCreator extends Serializable {
-
-    /**
-     * Create a query Filter by given Tuple.
-     * 
-     * @param tuple ITuple tuple
-     * @return query Filter
-     */
-    Bson createFilter(ITuple tuple);
-
-    /**
-     * Create a query Filter by given trident keys.
-     *
-     * @param keys keys
-     * @return query Filter
-     */
-    Bson createFilterByKeys(List<Object> keys);
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
deleted file mode 100644
index 16a71ec60..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/SimpleQueryFilterCreator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common;
-
-import com.mongodb.client.model.Filters;
-
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.conversions.Bson;
-
-public class SimpleQueryFilterCreator implements QueryFilterCreator {
-
-    private String field;
-    
-    @Override
-    public Bson createFilter(ITuple tuple) {
-        return Filters.eq(field, tuple.getValueByField(field));
-    }
-
-    @Override
-    public Bson createFilterByKeys(List<Object> keys) {
-        return Filters.eq("_id", MongoUtils.getId(keys));
-    }
-
-    public SimpleQueryFilterCreator withField(String field) {
-        this.field = field;
-        return this;
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
deleted file mode 100644
index 0d17820d3..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoLookupMapper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.ITuple;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-
-public interface MongoLookupMapper extends Serializable {
-
-    /**
-     * Converts a Mongo document to a list of storm values that can be 
emitted. This is done to allow a single
-     * storm input tuple and a single Mongo document to result in multiple 
output values.
-     * @param input the input tuple.
-     * @param doc the mongo document
-     * @return a List of storm values that can be emitted. Each item in list 
is emitted as an output tuple.
-     */
-    List<Values> toTuple(ITuple input, Document doc);
-
-    /**
-     * declare what are the fields that this code will output.
-     * @param declarer OutputFieldsDeclarer
-     */
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
deleted file mode 100644
index ed27c9269..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoMapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-/**
- * Given a Tuple/trident keys, converts it to an MongoDB document.
- */
-public interface MongoMapper extends Serializable {
-
-    /**
-     * Converts a Tuple to a Document.
-     * 
-     * @param tuple the incoming tuple
-     * @return the MongoDB document
-     */
-    Document toDocument(ITuple tuple);
-
-    /**
-     * Converts a keys to a Document.
-     *
-     * @param keys the trident keys
-     * @return the MongoDB document
-     */
-    Document toDocumentByKeys(List<Object> keys);
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
deleted file mode 100644
index 6dcee15c2..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/MongoUpdateMapper.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-/**
- * MongoUpdateMapper is for defining spec. which is used for converting Tuple/ 
trident keys to an MongoDB document.
- */
-public interface MongoUpdateMapper extends MongoMapper {
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
deleted file mode 100644
index 88407c35e..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoLookupMapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.ITuple;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-
-public class SimpleMongoLookupMapper implements MongoLookupMapper {
-
-    private String[] fields;
-
-    public SimpleMongoLookupMapper(String... fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public List<Values> toTuple(ITuple input, Document doc) {
-        Values values = new Values();
-
-        for (String field : fields) {
-            if (input.contains(field)) {
-                values.add(input.getValueByField(field));
-            } else {
-                values.add(doc.get(field));
-            }
-        }
-        List<Values> result = new ArrayList<Values>();
-        result.add(values);
-        return result;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(fields));
-    }
-
-    public SimpleMongoLookupMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
deleted file mode 100644
index 1a3882821..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoMapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import java.util.List;
-
-import org.apache.storm.mongodb.common.MongoUtils;
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoMapper implements MongoMapper {
-
-    private String[] fields;
-
-    public SimpleMongoMapper(String... fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for (String field : fields) {
-            document.append(field, tuple.getValueByField(field));
-        }
-        return document;
-    }
-
-    @Override
-    public Document toDocumentByKeys(List<Object> keys) {
-        Document document = new Document();
-        document.append("_id", MongoUtils.getId(keys));
-        return document;
-    }
-
-    public SimpleMongoMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
deleted file mode 100644
index 1c9cd41ef..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/mapper/SimpleMongoUpdateMapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.common.mapper;
-
-import org.apache.storm.tuple.ITuple;
-import org.bson.Document;
-
-public class SimpleMongoUpdateMapper extends SimpleMongoMapper implements 
MongoUpdateMapper {
-
-    private String[] fields;
-
-    public SimpleMongoUpdateMapper(String... fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public Document toDocument(ITuple tuple) {
-        Document document = new Document();
-        for (String field : fields) {
-            document.append(field, tuple.getValueByField(field));
-        }
-        //$set operator: Sets the value of a field in a document.
-        return new Document("$set", document);
-    }
-
-    @Override
-    public SimpleMongoUpdateMapper withFields(String... fields) {
-        this.fields = fields;
-        return this;
-    }
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
deleted file mode 100644
index f6cfc4341..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoMapState.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import com.google.common.collect.Maps;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
-import org.apache.storm.trident.state.JSONOpaqueSerializer;
-import org.apache.storm.trident.state.JSONTransactionalSerializer;
-import org.apache.storm.trident.state.OpaqueValue;
-import org.apache.storm.trident.state.Serializer;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-import org.apache.storm.trident.state.StateType;
-import org.apache.storm.trident.state.TransactionalValue;
-import org.apache.storm.trident.state.map.CachedMap;
-import org.apache.storm.trident.state.map.IBackingMap;
-import org.apache.storm.trident.state.map.MapState;
-import org.apache.storm.trident.state.map.NonTransactionalMap;
-import org.apache.storm.trident.state.map.OpaqueMap;
-import org.apache.storm.trident.state.map.SnapshottableMap;
-import org.apache.storm.trident.state.map.TransactionalMap;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoMapState<T> implements IBackingMap<T> {
-    private static Logger LOG = LoggerFactory.getLogger(MongoMapState.class);
-
-    @SuppressWarnings("rawtypes")
-    private static final Map<StateType, Serializer> DEFAULT_SERIALZERS = 
Maps.newHashMap();
-
-    static {
-        DEFAULT_SERIALZERS.put(StateType.NON_TRANSACTIONAL, new 
JSONNonTransactionalSerializer());
-        DEFAULT_SERIALZERS.put(StateType.TRANSACTIONAL, new 
JSONTransactionalSerializer());
-        DEFAULT_SERIALZERS.put(StateType.OPAQUE, new JSONOpaqueSerializer());
-    }
-
-    private Options<T> options;
-    private Serializer<T> serializer;
-    private MongoDbClient mongoClient;
-    private Map<String, Object> map;
-
-    protected MongoMapState(Map<String, Object> map, Options options) {
-        this.options = options;
-        this.map = map;
-        this.serializer = options.serializer;
-
-        Validate.notEmpty(options.url, "url can not be blank or null");
-        Validate.notEmpty(options.collectionName, "collectionName can not be 
blank or null");
-        Validate.notNull(options.queryCreator, "queryCreator can not be null");
-        Validate.notNull(options.mapper, "mapper can not be null");
-
-        this.mongoClient = new MongoDbClient(options.url, 
options.collectionName);
-    }
-
-    public static class Options<T> implements Serializable {
-        public String url;
-        public String collectionName;
-        public MongoMapper mapper;
-        public QueryFilterCreator queryCreator;
-        public Serializer<T> serializer;
-        public int cacheSize = 5000;
-        public String globalKey = "$MONGO-MAP-STATE-GLOBAL";
-        public String serDocumentField = "tridentSerField";
-    }
-
-
-    @SuppressWarnings("rawtypes")
-    public static StateFactory opaque() {
-        Options<OpaqueValue> options = new Options<OpaqueValue>();
-        return opaque(options);
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static StateFactory opaque(Options<OpaqueValue> opts) {
-
-        return new Factory(StateType.OPAQUE, opts);
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static StateFactory transactional() {
-        Options<TransactionalValue> options = new 
Options<TransactionalValue>();
-        return transactional(options);
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static StateFactory transactional(Options<TransactionalValue> opts) 
{
-        return new Factory(StateType.TRANSACTIONAL, opts);
-    }
-
-    public static StateFactory nonTransactional() {
-        Options<Object> options = new Options<Object>();
-        return nonTransactional(options);
-    }
-
-    public static StateFactory nonTransactional(Options<Object> opts) {
-        return new Factory(StateType.NON_TRANSACTIONAL, opts);
-    }
-
-
-    protected static class Factory implements StateFactory {
-        private StateType stateType;
-        private Options options;
-
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        public Factory(StateType stateType, Options options) {
-            this.stateType = stateType;
-            this.options = options;
-
-            if (this.options.serializer == null) {
-                this.options.serializer = DEFAULT_SERIALZERS.get(stateType);
-            }
-
-            if (this.options.serializer == null) {
-                throw new RuntimeException("Serializer should be specified for 
type: " + stateType);
-            }
-        }
-
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        @Override
-        public State makeState(Map<String, Object> conf, IMetricsContext 
metrics, int partitionIndex, int numPartitions) {
-            IBackingMap state = new MongoMapState(conf, options);
-
-            if (options.cacheSize > 0) {
-                state = new CachedMap(state, options.cacheSize);
-            }
-
-            MapState mapState;
-            switch (stateType) {
-                case NON_TRANSACTIONAL:
-                    mapState = NonTransactionalMap.build(state);
-                    break;
-                case OPAQUE:
-                    mapState = OpaqueMap.build(state);
-                    break;
-                case TRANSACTIONAL:
-                    mapState = TransactionalMap.build(state);
-                    break;
-                default:
-                    throw new IllegalArgumentException("Unknown state type: " 
+ stateType);
-            }
-            return new SnapshottableMap(mapState, new 
Values(options.globalKey));
-        }
-
-    }
-
-    @Override
-    public List<T> multiGet(List<List<Object>> keysList) {
-        List<T> retval = new ArrayList<>();
-        try {
-            for (List<Object> keys : keysList) {
-                Bson filter = options.queryCreator.createFilterByKeys(keys);
-                Document doc = mongoClient.find(filter);
-                if (doc != null) {
-                    retval.add(this.serializer.deserialize((byte[]) 
doc.get(options.serDocumentField)));
-                } else {
-                    retval.add(null);
-                }
-            }
-        } catch (Exception e) {
-            LOG.warn("Batch get operation failed.", e);
-            throw new FailedException(e);
-        }
-        return retval;
-    }
-
-    @Override
-    public void multiPut(List<List<Object>> keysList, List<T> values) {
-        try {
-            for (int i = 0; i < keysList.size(); i++) {
-                List<Object> keys = keysList.get(i);
-                T value = values.get(i);
-                Bson filter = options.queryCreator.createFilterByKeys(keys);
-                Document document = options.mapper.toDocumentByKeys(keys);
-                document.append(options.serDocumentField, 
this.serializer.serialize(value));
-                this.mongoClient.update(filter, document, true, false);
-            }
-        } catch (Exception e) {
-            LOG.warn("Batch write operation failed.", e);
-            throw new FailedException(e);
-        }
-    }
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
deleted file mode 100644
index 100f9313a..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import com.google.common.collect.Lists;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.Validate;
-import org.apache.storm.mongodb.common.MongoDbClient;
-import org.apache.storm.mongodb.common.QueryFilterCreator;
-import org.apache.storm.mongodb.common.mapper.MongoLookupMapper;
-import org.apache.storm.mongodb.common.mapper.MongoMapper;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MongoState implements State {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MongoState.class);
-
-    private Options options;
-    private MongoDbClient mongoClient;
-    private Map<String, Object> map;
-
-    protected MongoState(Map<String, Object> map, Options options) {
-        this.options = options;
-        this.map = map;
-    }
-
-    public static class Options implements Serializable {
-        private String url;
-        private String collectionName;
-        private MongoMapper mapper;
-        private MongoLookupMapper lookupMapper;
-        private QueryFilterCreator queryCreator;
-
-        public Options withUrl(String url) {
-            this.url = url;
-            return this;
-        }
-
-        public Options withCollectionName(String collectionName) {
-            this.collectionName = collectionName;
-            return this;
-        }
-
-        public Options withMapper(MongoMapper mapper) {
-            this.mapper = mapper;
-            return this;
-        }
-
-        public Options withMongoLookupMapper(MongoLookupMapper lookupMapper) {
-            this.lookupMapper = lookupMapper;
-            return this;
-        }
-
-        public Options withQueryFilterCreator(QueryFilterCreator queryCreator) 
{
-            this.queryCreator = queryCreator;
-            return this;
-        }
-    }
-
-    protected void prepare() {
-        Validate.notEmpty(options.url, "url can not be blank or null");
-        Validate.notEmpty(options.collectionName, "collectionName can not be 
blank or null");
-
-        this.mongoClient = new MongoDbClient(options.url, 
options.collectionName);
-    }
-
-    @Override
-    public void beginCommit(Long txid) {
-        LOG.debug("beginCommit is noop.");
-    }
-
-    @Override
-    public void commit(Long txid) {
-        LOG.debug("commit is noop.");
-    }
-
-    /**
-     * Update Mongo state.
-     * @param tuples trident tuples
-     * @param collector trident collector
-     */
-    public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
-        List<Document> documents = Lists.newArrayList();
-        for (TridentTuple tuple : tuples) {
-            Document document = options.mapper.toDocument(tuple);
-            documents.add(document);
-        }
-
-        try {
-            this.mongoClient.insert(documents, true);
-        } catch (Exception e) {
-            LOG.warn("Batch write failed but some requests might have 
succeeded. Triggering replay.", e);
-            throw new FailedException(e);
-        }
-    }
-
-    /**
-     * Batch retrieve values.
-     * @param tridentTuples trident tuples
-     * @return values
-     */
-    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
-        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
-        try {
-            for (TridentTuple tuple : tridentTuples) {
-                Bson filter = options.queryCreator.createFilter(tuple);
-                Document doc = mongoClient.find(filter);
-                List<Values> values = options.lookupMapper.toTuple(tuple, doc);
-                batchRetrieveResult.add(values);
-            }
-        } catch (Exception e) {
-            LOG.warn("Batch get operation failed. Triggering replay.", e);
-            throw new FailedException(e);
-        }
-        return batchRetrieveResult;
-    }
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
deleted file mode 100644
index d27d9a92d..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.Map;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-public class MongoStateFactory implements StateFactory {
-
-    private MongoState.Options options;
-
-    public MongoStateFactory(MongoState.Options options) {
-        this.options = options;
-    }
-
-    @Override
-    public State makeState(Map<String, Object> conf, IMetricsContext metrics,
-            int partitionIndex, int numPartitions) {
-        MongoState state = new MongoState(conf, options);
-        state.prepare();
-        return state;
-    }
-
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
deleted file mode 100644
index 2cf46cff2..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateQuery.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseQueryFunction;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Values;
-
-public class MongoStateQuery extends BaseQueryFunction<MongoState, 
List<Values>> {
-
-    @Override
-    public List<List<Values>> batchRetrieve(MongoState mongoState, 
List<TridentTuple> tridentTuples) {
-        return mongoState.batchRetrieve(tridentTuples);
-    }
-
-    @Override
-    public void execute(TridentTuple tuples, List<Values> values, 
TridentCollector tridentCollector) {
-        for (Values value : values) {
-            tridentCollector.emit(value);
-        }
-    }
-}
diff --git 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
 
b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
deleted file mode 100644
index 41311b3c3..000000000
--- 
a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoStateUpdater.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.mongodb.trident.state;
-
-import java.util.List;
-
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class MongoStateUpdater extends BaseStateUpdater<MongoState>  {
-
-    @Override
-    public void updateState(MongoState state, List<TridentTuple> tuples,
-            TridentCollector collector) {
-        state.updateState(tuples, collector);
-    }
-
-}
diff --git a/pom.xml b/pom.xml
index b460324d4..8d04950e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -496,7 +496,6 @@
                 <module>external/storm-solr</module>
                 <module>external/storm-metrics</module>
                 <module>external/storm-mqtt</module>
-                <module>external/storm-mongodb</module>
                 <module>external/storm-kafka-client</module>
                 <module>external/storm-kafka-migration</module>
                 <module>external/storm-opentsdb</module>
@@ -520,7 +519,6 @@
             <modules>
                 <module>examples/storm-starter</module>
                 <module>examples/storm-loadgen</module>
-                <module>examples/storm-mongodb-examples</module>
                 <module>examples/storm-redis-examples</module>
                 <module>examples/storm-opentsdb-examples</module>
                 <module>examples/storm-solr-examples</module>

Reply via email to