Author: sblackmon
Date: Wed Feb 19 01:52:40 2014
New Revision: 1569602

URL: http://svn.apache.org/r1569602
Log:
Initial implementation of mongo writer

Added:
    incubator/streams/trunk/streams-contrib/streams-persist-mongo/
    incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml
    incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/
    incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/
    incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/
    
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties

Added: incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml?rev=1569602&view=auto
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml 
(added)
+++ incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml Wed 
Feb 19 01:52:40 2014
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>streams-contrib</artifactId>
+        <groupId>org.apache.streams</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-persist-mongo</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>2.12.0-rc0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                
<source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        
<sourcePath>src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json</sourcePath>
+                    </sourcePaths>
+                    
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    
<targetPackage>org.apache.streams.mongo.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>false</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java?rev=1569602&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
 Wed Feb 19 01:52:40 2014
@@ -0,0 +1,31 @@
+package org.apache.streams.mongo;
+
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class MongoConfigurator {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(MongoConfigurator.class);
+
+    public static MongoConfiguration detectConfiguration(Config mongo) {
+
+        MongoConfiguration mongoConfiguration = new MongoConfiguration();
+
+        mongoConfiguration.setHost(mongo.getString("host"));
+        mongoConfiguration.setPort(new Long(mongo.getInt("port")));
+        mongoConfiguration.setDb(mongo.getString("db"));
+        mongoConfiguration.setCollection(mongo.getString("collection"));
+
+        if( mongo.hasPath("user"))
+            mongoConfiguration.setUser(mongo.getString("user"));
+        if( mongo.hasPath("password"))
+            mongoConfiguration.setPassword(mongo.getString("password"));
+        return mongoConfiguration;
+    }
+
+}

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java?rev=1569602&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
 Wed Feb 19 01:52:40 2014
@@ -0,0 +1,163 @@
+package org.apache.streams.mongo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.mongodb.DB;
+import com.mongodb.DBAddress;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.util.JSON;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class MongoPersistWriter implements StreamsPersistWriter, Runnable
+{
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(MongoPersistWriter.class);
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private MongoConfiguration config;
+
+    protected DB client;
+    protected DBAddress dbaddress;
+    protected DBCollection collection;
+
+    protected List<DBObject> insertBatch = new ArrayList<DBObject>();
+
+    public MongoPersistWriter() {
+        Config config = StreamsConfigurator.config.getConfig("mongo");
+        this.config = MongoConfigurator.detectConfiguration(config);
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public MongoPersistWriter(Queue<StreamsDatum> persistQueue) {
+        Config config = StreamsConfigurator.config.getConfig("mongo");
+        this.config = MongoConfigurator.detectConfiguration(config);
+        this.persistQueue = persistQueue;
+    }
+
+    private synchronized void connectToMongo()
+    {
+        try {
+            dbaddress = new DBAddress(config.getHost(), 
config.getPort().intValue(), config.getDb());
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        client = MongoClient.connect(dbaddress);
+
+        if( !Strings.isNullOrEmpty(config.getUser()) && 
!Strings.isNullOrEmpty(config.getPassword()))
+            client.authenticate(config.getUser(), 
config.getPassword().toCharArray());
+
+        if( !client.collectionExists(config.getCollection())) {
+            client.createCollection(config.getCollection(), null);
+        };
+
+        collection = client.getCollection(config.getCollection());
+    }
+    
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+
+        DBObject dbObject;
+        if( streamsDatum.getDocument() instanceof String ) {
+            dbObject = (DBObject) 
JSON.parse((String)streamsDatum.getDocument());
+        } else {
+            try {
+                ObjectNode node = 
mapper.valueToTree(streamsDatum.getDocument());
+                dbObject = (DBObject) JSON.parse(node.toString());
+            } catch (Exception e) {
+                e.printStackTrace();
+                LOGGER.warn("Unsupported type: " + 
streamsDatum.getDocument().getClass());
+                return;
+            }
+        }
+
+        insertBatch.add(dbObject);
+
+        if( insertBatch.size() % 100 == 0)
+            try {
+                flush();
+            } catch (IOException e) {
+                e.printStackTrace();
+                return;
+            }
+    }
+
+    public void flush() throws IOException
+    {
+        collection.insert(insertBatch);
+    }
+
+    public synchronized void close() throws IOException
+    {
+        client.cleanCursors(true);
+    }
+
+    @Override
+    public void start() {
+
+        connectToMongo();
+
+    }
+
+    @Override
+    public void stop() {
+
+        try {
+            flush();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        try {
+            close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    @Override
+    public Queue<StreamsDatum> getPersistQueue() {
+        return persistQueue;
+    }
+
+
+    @Override
+    public void run() {
+
+        start();
+
+        Thread task = new Thread(new MongoPersistWriterTask(this));
+        task.start();
+
+        try {
+            task.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        stop();
+    }
+}

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java?rev=1569602&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
 Wed Feb 19 01:52:40 2014
@@ -0,0 +1,38 @@
+package org.apache.streams.mongo;
+
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class MongoPersistWriterTask implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MongoPersistWriterTask.class);
+
+    private MongoPersistWriter writer;
+
+    public MongoPersistWriterTask(MongoPersistWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            if( writer.getPersistQueue().peek() != null ) {
+                try {
+                    StreamsDatum entry = writer.persistQueue.remove();
+                    writer.write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            try {
+                Thread.sleep(new Random().nextInt(1));
+            } catch (InterruptedException e) {}
+        }
+
+    }
+
+}

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json?rev=1569602&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
 Wed Feb 19 01:52:40 2014
@@ -0,0 +1,33 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : "org.apache.streams.mongo.MongoConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "host": {
+            "type": "string",
+            "description": "Hbase host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "ZK Port"
+        },
+        "user": {
+            "type": "string",
+            "description": "User"
+        },
+        "password": {
+            "type": "string",
+            "description": "Password"
+        },
+        "db": {
+            "type": "string",
+            "description": "DB"
+        },
+        "collection": {
+            "type": "string",
+            "description": "Collection"
+        }
+    }
+}
\ No newline at end of file

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties?rev=1569602&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
 Wed Feb 19 01:52:40 2014
@@ -0,0 +1,10 @@
+hbase.rootdir = "hdfs://localhost:8020/hbase"
+
+zookeeper.znode.parent = "/hbase"
+
+zookeeper.znode.rootserver = "localhost"
+
+hbase.zookeeper.quorum = "localhost"
+
+hbase.zookeeper.property.clientPort = 2181
+


Reply via email to