Author: sblackmon
Date: Wed Feb 19 01:52:40 2014
New Revision: 1569602
URL: http://svn.apache.org/r1569602
Log:
Initial implementation of mongo writer
Added:
incubator/streams/trunk/streams-contrib/streams-persist-mongo/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
Added: incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml?rev=1569602&view=auto
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml
(added)
+++ incubator/streams/trunk/streams-contrib/streams-persist-mongo/pom.xml Wed
Feb 19 01:52:40 2014
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>streams-contrib</artifactId>
+ <groupId>org.apache.streams</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-persist-mongo</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>2.12.0-rc0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+
<source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+
<sourcePath>src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json</sourcePath>
+ </sourcePaths>
+
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+
<targetPackage>org.apache.streams.mongo.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>false</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
Added:
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java?rev=1569602&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoConfigurator.java
Wed Feb 19 01:52:40 2014
@@ -0,0 +1,31 @@
+package org.apache.streams.mongo;
+
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class MongoConfigurator {
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(MongoConfigurator.class);
+
+ public static MongoConfiguration detectConfiguration(Config mongo) {
+
+ MongoConfiguration mongoConfiguration = new MongoConfiguration();
+
+ mongoConfiguration.setHost(mongo.getString("host"));
+ mongoConfiguration.setPort(new Long(mongo.getInt("port")));
+ mongoConfiguration.setDb(mongo.getString("db"));
+ mongoConfiguration.setCollection(mongo.getString("collection"));
+
+ if( mongo.hasPath("user"))
+ mongoConfiguration.setUser(mongo.getString("user"));
+ if( mongo.hasPath("password"))
+ mongoConfiguration.setPassword(mongo.getString("password"));
+ return mongoConfiguration;
+ }
+
+}
Added:
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java?rev=1569602&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
Wed Feb 19 01:52:40 2014
@@ -0,0 +1,163 @@
+package org.apache.streams.mongo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.mongodb.DB;
+import com.mongodb.DBAddress;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.util.JSON;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class MongoPersistWriter implements StreamsPersistWriter, Runnable
+{
+ private final static Logger LOGGER =
LoggerFactory.getLogger(MongoPersistWriter.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private MongoConfiguration config;
+
+ protected DB client;
+ protected DBAddress dbaddress;
+ protected DBCollection collection;
+
+ protected List<DBObject> insertBatch = new ArrayList<DBObject>();
+
+ public MongoPersistWriter() {
+ Config config = StreamsConfigurator.config.getConfig("mongo");
+ this.config = MongoConfigurator.detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public MongoPersistWriter(Queue<StreamsDatum> persistQueue) {
+ Config config = StreamsConfigurator.config.getConfig("mongo");
+ this.config = MongoConfigurator.detectConfiguration(config);
+ this.persistQueue = persistQueue;
+ }
+
+ private synchronized void connectToMongo()
+ {
+ try {
+ dbaddress = new DBAddress(config.getHost(),
config.getPort().intValue(), config.getDb());
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ client = MongoClient.connect(dbaddress);
+
+ if( !Strings.isNullOrEmpty(config.getUser()) &&
!Strings.isNullOrEmpty(config.getPassword()))
+ client.authenticate(config.getUser(),
config.getPassword().toCharArray());
+
+ if( !client.collectionExists(config.getCollection())) {
+ client.createCollection(config.getCollection(), null);
+ };
+
+ collection = client.getCollection(config.getCollection());
+ }
+
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+
+ DBObject dbObject;
+ if( streamsDatum.getDocument() instanceof String ) {
+ dbObject = (DBObject)
JSON.parse((String)streamsDatum.getDocument());
+ } else {
+ try {
+ ObjectNode node =
mapper.valueToTree(streamsDatum.getDocument());
+ dbObject = (DBObject) JSON.parse(node.toString());
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Unsupported type: " +
streamsDatum.getDocument().getClass());
+ return;
+ }
+ }
+
+ insertBatch.add(dbObject);
+
+ if( insertBatch.size() % 100 == 0)
+ try {
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+ public void flush() throws IOException
+ {
+ collection.insert(insertBatch);
+ }
+
+ public synchronized void close() throws IOException
+ {
+ client.cleanCursors(true);
+ }
+
+ @Override
+ public void start() {
+
+ connectToMongo();
+
+ }
+
+ @Override
+ public void stop() {
+
+ try {
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ @Override
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
+
+
+ @Override
+ public void run() {
+
+ start();
+
+ Thread task = new Thread(new MongoPersistWriterTask(this));
+ task.start();
+
+ try {
+ task.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ stop();
+ }
+}
Added:
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java?rev=1569602&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriterTask.java
Wed Feb 19 01:52:40 2014
@@ -0,0 +1,38 @@
+package org.apache.streams.mongo;
+
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class MongoPersistWriterTask implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MongoPersistWriterTask.class);
+
+ private MongoPersistWriter writer;
+
+ public MongoPersistWriterTask(MongoPersistWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ if( writer.getPersistQueue().peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException e) {}
+ }
+
+ }
+
+}
Added:
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json?rev=1569602&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/jsonschema/org/apache/streams/mongo/MongoConfiguration.json
Wed Feb 19 01:52:40 2014
@@ -0,0 +1,33 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.mongo.MongoConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "host": {
+ "type": "string",
+ "description": "Hbase host"
+ },
+ "port": {
+ "type": "integer",
+ "description": "ZK Port"
+ },
+ "user": {
+ "type": "string",
+ "description": "User"
+ },
+ "password": {
+ "type": "string",
+ "description": "Password"
+ },
+ "db": {
+ "type": "string",
+ "description": "DB"
+ },
+ "collection": {
+ "type": "string",
+ "description": "Collection"
+ }
+ }
+}
\ No newline at end of file
Added:
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties?rev=1569602&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
Wed Feb 19 01:52:40 2014
@@ -0,0 +1,10 @@
+hbase.rootdir = "hdfs://localhost:8020/hbase"
+
+zookeeper.znode.parent = "/hbase"
+
+zookeeper.znode.rootserver = "localhost"
+
+hbase.zookeeper.quorum = "localhost"
+
+hbase.zookeeper.property.clientPort = 2181
+