[ 
https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688168#comment-16688168
 ] 

ASF GitHub Bot commented on FLINK-3967:
---------------------------------------

zentol closed pull request #2031: FLINK-3967 - Flink Sink for Rethink Db
URL: https://github.com/apache/flink/pull/2031
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-streaming-connectors/flink-connector-rethinkdb/pom.xml 
b/flink-streaming-connectors/flink-connector-rethinkdb/pom.xml
new file mode 100644
index 00000000000..13177e76a0e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-rethinkdb/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-streaming-connectors</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-rethinkdb_2.10</artifactId>
+       <name>flink-connector-rethinkdb</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->
+       <properties>
+               <rethinkdb.version>2.3.0</rethinkdb.version>
+       </properties>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests_2.10</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.10</artifactId>
+            <version>${project.version}</version>
+                       <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+       <dependency>
+           <groupId>com.rethinkdb</groupId>
+           <artifactId>rethinkdb-driver</artifactId>
+           <version>${rethinkdb.version}</version>
+        </dependency>
+
+    </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       
<rerunFailingTestsCount>3</rerunFailingTestsCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/ConflictStrategy.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/ConflictStrategy.java
new file mode 100644
index 00000000000..fb0099c4aa0
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/ConflictStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+/**
+ * Enumeration class for conflict resolution strategy in case of id conflict 
between the
+ * document being inserted and one existing in RethinkDB.  By default
+ * RethinkDB uses "id" field as the id for the documents.
+ */
+public enum ConflictStrategy {
+       
+       /**
+        * Conflict resolution option to update the document with conflicting id
+        */
+       update,
+       
+       /**
+        * Conflict resolution option to produce in error on conflicting id
+        */
+       error,
+
+       /**
+        * Conflict resolution option to replace the document on conflicting id
+        */
+       replace;
+
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/Durability.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/Durability.java
new file mode 100644
index 00000000000..f6bafcf16ad
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/Durability.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+/**
+ * Durability configuration for writing documents
+ */
+public enum Durability {
+       
+       /**
+        * Hard durablity implies response is send to the client after the data 
is saved to the disk
+        */
+       hard,
+       
+       /**
+        * Soft durablity implies the server acks immediately after receiving 
the document
+        */
+       soft;
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/JSONArraySerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/JSONArraySerializationSchema.java
new file mode 100644
index 00000000000..3030e774c9d
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/JSONArraySerializationSchema.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import org.json.simple.JSONArray;
+
+/**
+ * JSONArray serialization schema which just returns the input JSONArray object
+ * 
+ * @see RethinkDbSerializationSchema<T>
+ * 
+ * @see org.json.simple.JSONArray
+ * @see org.json.simple.JSONObject
+ */
+public class JSONArraySerializationSchema implements 
JSONSerializationSchema<JSONArray>{
+
+       /**
+        * Serial version of the class
+        */
+       private static final long serialVersionUID = -1911725512036206214L;
+
+       /**
+     * Convert json to JSON object
+     * 
+     * @param input json array
+     * @return JSONObject or JSONArray
+     * 
+     * @throws ParseException in case of problems parsing input string
+     */
+       @Override
+       public Object toJSON(JSONArray input) throws Exception {
+               return input;
+       }
+
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/JSONSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/JSONSerializationSchema.java
new file mode 100644
index 00000000000..2c3473ea317
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/JSONSerializationSchema.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import java.io.Serializable;
+
+/**
+ * Interface implemented by transformer that can convert an input into a 
{@link org.json.simple.JSONArray;} or {@link org.json.simple.JSONObject}
+
+ * 
+ * @param <T> the input type
+ * 
+ * @see StringJSONSerializationSchema
+ */
+public interface JSONSerializationSchema<T> extends Serializable{
+       
+       /**
+        * Transform input into JSON object
+        * @param input
+        * @return json array or object
+        * @throws Exception if there is a problem while transforming the object
+        */
+       public Object toJSON(T input) throws Exception;
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/RethinkDBSink.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/RethinkDBSink.java
new file mode 100644
index 00000000000..ecd5a417cca
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/RethinkDBSink.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.Insert;
+import com.rethinkdb.gen.ast.Table;
+import com.rethinkdb.net.Connection;
+
+/**
+ * This class is the Flink sink for RethinkDB which is a tcp/JSON protocol 
based document
+ * oriented NoSQL database.
+ * 
+ * <p/>
+ * This sink provides two constuctors:
+ * <p/>
+ * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, 
String table, JSONSerializationSchema schema)}, and
+ * <p/>
+ * {@link #FlinkRethinkDbSink(String hostname, int hostport, String database, 
String table, JSONSerializationSchema schema, ConflictStrategy 
conflictStrategy)}
+ * <p/>
+ * 
+ * The parameter for the constructor are as follows:
+ * <p/>
+ * <ul>
+ * <li>hostname - the rethinkdb hostname</li>
+ * <li>hosport - the rethinkdb port for the driver to connect</li>
+ * <li>database - the rethinkdb database name to which the table belongs</li>
+ * <li>table - the rethinkdb table name where documents are inserted</li>
+ * <li>schema - the schema tranfromer that converts input to JSONObject, or 
JSONArray</li>
+ * <li>conflictStrategy - the conflict resolution strategy in case inserted 
document has id which exists in the db</li>
+ * </ul>
+ * <p/>
+ *
+ * The user can also set:
+ * <p/>
+ * <ul>
+ * <li>username - default is admin</li>
+ * <li>password - default is blank</li>
+ * </ul>
+ * <p/> with the {@link #setUsernameAndPassword(String, String)} method.
+ * <p/>
+ * <b>NOTE: If multiple documents are getting inserted (eg: using JSONArray), 
the sink 
+ * checks if there is an error entry in the result HashMap and throws a 
runtime exception if errors
+ * counts is not zero.  The exception message contains the results HashMap. 
+ * In case of multiple errors only the first error is noted in the result 
HashMap.
+ * </b>
+ * 
+ * @see {@link ConflictStrategy} for conflict resolution strategies
+ * 
+ * @param <OUT> a value that can be transformed into a {@link 
org.json.simple.JSONArray;} or {@link org.json.simple.JSONObject}
+ */
+public class RethinkDBSink<OUT> extends RichSinkFunction<OUT> implements 
Serializable{
+
+       /**
+        * Serial version for the class
+        */
+       private static final long serialVersionUID = -2135499016796158755L;
+
+       /**
+        * Logger for the class
+        */
+       private static final Logger LOG = 
LoggerFactory.getLogger(RethinkDBSink.class);
+
+       /**
+        * Conflict resolution option key in case document ids are same 
+        */
+       public static final String CONFLICT_OPT = "conflict";
+
+       /**
+        * Result key indicating number of errors
+        */
+       public static final String RESULT_ERROR_KEY = "errors";
+
+       /**
+        * Serialization schema for the sink
+        */
+       private JSONSerializationSchema<OUT> serializationSchema;
+
+       /**
+        * RethinkDB connection object
+        */
+       private transient Connection rethinkDbConnection;
+
+       /**
+        * RethinkDB hostname
+        */
+       private String hostname;
+
+       /**
+        * RethinkDB port
+        */
+       private int hostport;
+
+       /**
+        * RethinkDB tablename where documents are inserted
+        */
+       private String tableName;
+
+       /**
+        * RethinkDB database where document are inserted
+        */
+       private String databaseName;
+
+       /**
+        * Conflict resolution strategy
+        */
+       private ConflictStrategy conflict;
+       
+       /**
+        * Default user name
+        */
+       public static final String DEFAULT_USER_NAME = "admin";
+       
+       /**
+        * User name
+        */
+       private String username = DEFAULT_USER_NAME;
+       
+       /**
+        * Default user name
+        */
+       public static final String DEFAULT_PASSWORD = "";
+       
+       /**
+        * password
+        */
+       private String password = DEFAULT_PASSWORD;
+
+       /**
+        * Durability configuration
+        */
+       protected Durability durability = Durability.hard;
+       
+       /**
+        * Constructor for RethinkDB sink
+        * @param hostname
+        * @param hostport
+        * @param database
+        * @param table
+        * @param schema
+        */
+       public RethinkDBSink(String hostname, int hostport, String database, 
String table, 
+                       JSONSerializationSchema<OUT> schema) {
+               this(hostname, hostport, database, table, schema, 
ConflictStrategy.update);
+       }
+
+       /**
+        * Constructor for sink
+        * @param hostname
+        * @param hostport
+        * @param database name
+        * @param table name
+        * @param schema serialization converter
+        * @param conflict resolution strategy for document id conflict
+        */
+       public RethinkDBSink(String hostname, int hostport, String database, 
String table, 
+                       JSONSerializationSchema<OUT> schema, 
+                       ConflictStrategy conflict) {
+               this.hostname = Objects.requireNonNull(hostname);
+               this.hostport = hostport;
+               this.databaseName = Objects.requireNonNull(database);
+               this.tableName = Objects.requireNonNull(table);
+               this.serializationSchema = Objects.requireNonNull(schema);
+               this.conflict = conflict;
+       }
+
+       /**
+        * Open the sink
+        */
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               LOG.info("Received parameters : {}", parameters);
+               
+               super.open(parameters);
+
+               rethinkDbConnection = 
getRethinkDB().connection().hostname(hostname)
+                               .port(hostport).user(username, 
password).connect();
+
+               LOG.info("RethinkDb connection created for host {} port {} and 
db {}", 
+                               hostname, hostport,databaseName);
+       }
+
+       /**
+        * Helper method to help testability
+        * @return RethinkDB instance
+        */ 
+       protected RethinkDB getRethinkDB() {
+               return RethinkDB.r;
+       }
+       
+       /**
+        * Set username and password. If username and password are not provided,
+        * then default username (admin) and blank password are used.
+        * 
+        * @param username cannot be blank/null
+        * @param password cannot be null
+        * 
+        * @throws IllegalArgumentException if arguments is null or empty
+        */
+       public void setUsernameAndPassword(String username, String password) {
+               
+               if ( StringUtils.isBlank(username) )  {
+                       throw new IllegalArgumentException("username " + 
username + " cannot be null or empty" ); 
+               } else {
+                       this.username = username;
+               }
+               
+               this.password = (password == null) ? "" : password;
+       }
+       
+       /**
+        * Invoke the sink with the input
+        * 
+        * @param the value to be inserted
+        * 
+        * @throws RuntimeException if there are errors while inserting row 
into rethinkdb
+        */
+       @Override
+       public void invoke(OUT value) throws Exception {
+               LOG.debug("Received value {}", value);
+               
+               Object json = serializationSchema.toJSON(value);
+               LOG.debug("Object/Json: {}/{}", value, json);
+               Insert insert = getRdbTable().insert(json)
+                               .optArg(CONFLICT_OPT, conflict.toString())
+                               .optArg("durability", durability.name());
+               HashMap<String,Object> result = runInsert(insert);
+               
+               LOG.debug("Object/Json/Result: {}/{}/{}", value, json, result);
+               
+               if ( (Long)result.get(RESULT_ERROR_KEY) != 0 ) {
+                       LOG.error("There were errors while inserting data 
value/result {}/{}", value, result);
+                       throw new RuntimeException("Errors " + result + " while 
inserting " + value );
+               }
+       }
+
+       protected HashMap<String,Object> runInsert(Insert insert) {
+               return insert.run(rethinkDbConnection);
+       }
+
+       /**
+        * Close the sink
+        */
+       @Override
+       public void close() throws Exception {
+               LOG.info("Closing connection");
+               
+               rethinkDbConnection.close();
+               
+               super.close();
+       }
+
+       /**
+        * Set durability
+        * 
+        * @param durability cannot be <code>null</code>
+        * 
+        * @throws IllegalArgumentException if durability is <code>null</code>
+        * 
+        * @see #Durability
+        */
+       public void setDurability(Durability durability) {
+               if ( durability == null ) {
+                       throw new IllegalArgumentException("Durability cannot 
be null " + durability);
+               }
+               this.durability = durability;
+       }
+
+       /**
+        * @return the durability
+        */
+       public Durability getDurability() {
+               return durability;
+       }
+
+       /**
+        * @return the conflict
+        */
+       public ConflictStrategy getConflictStrategy() {
+               return conflict;
+       }
+
+       /**
+        * @param conflict the conflict to set
+        */
+       public void setConflictStrategy(ConflictStrategy conflict) {
+               if ( conflict == null ) {
+                       throw new IllegalArgumentException("ConflictStrategy 
cannot be null " + conflict);
+               }
+               
+               this.conflict = conflict;
+       }
+
+       /**
+        * @return the rethinkDbConnection
+        */
+       protected Connection getRethinkDbConnection() {
+               return rethinkDbConnection;
+       }
+
+       /**
+        * @return the rdbTable
+        */
+       protected Table getRdbTable() {
+               return getRethinkDB().db(databaseName).table(tableName);
+       }
+
+       /**
+        * @return the hostname
+        */
+       public String getHostname() {
+               return hostname;
+       }
+
+       /**
+        * @return the hostport
+        */
+       public int getHostport() {
+               return hostport;
+       }
+
+       /**
+        * @return the tableName
+        */
+       public String getTableName() {
+               return tableName;
+       }
+
+       /**
+        * @return the databaseName
+        */
+       public String getDatabaseName() {
+               return databaseName;
+       }
+
+       /**
+        * @return the username
+        */
+       public String getUsername() {
+               return username;
+       }
+
+       /**
+        * @return the password
+        */
+       public String getPassword() {
+               return password;
+       }
+
+}
\ No newline at end of file
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/StringJSONSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/StringJSONSerializationSchema.java
new file mode 100644
index 00000000000..a72222770e5
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/StringJSONSerializationSchema.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import org.json.simple.parser.JSONParser;
+
+/**
+ * String serialization schema to convert string to json object/array
+ * 
+ * @see RethinkDbSerializationSchema<T>
+ * 
+ * @see org.json.simple.JSONArray
+ * @see org.json.simple.JSONObject
+ */
+public class StringJSONSerializationSchema implements 
JSONSerializationSchema<String>{
+
+       /**
+        * Serial version of the class
+        */
+       private static final long serialVersionUID = 686588590347479359L;
+
+       /**
+     * Convert String to JSON object
+     * 
+     * @param input string
+     * @return JSONObject or JSONArray
+     * 
+     * @throws ParseException in case of problems parsing input string
+     */
+       @Override
+       public Object toJSON(String input) throws Exception {
+               JSONParser parser = new JSONParser();
+               return parser.parse(input);
+       }
+
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/JSONArraySerializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/JSONArraySerializationSchemaTest.java
new file mode 100644
index 00000000000..96381b970c2
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/JSONArraySerializationSchemaTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import static org.junit.Assert.assertEquals;
+
+import org.json.simple.JSONArray;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JSONArraySerializationSchemaTest {
+
+       protected JSONArraySerializationSchema schema;
+       
+       @Before
+       public void setUp() {
+               schema = new JSONArraySerializationSchema();
+       }
+       
+       @Test
+       public void testToJSONWithJSONArray() throws Exception {
+               JSONArray array = new JSONArray();
+               JSONArray result = (JSONArray) schema.toJSON(array);
+               assertEquals(array, result);
+       }
+
+       @Test
+       public void testToJSONWithNull() throws Exception {
+               JSONArray result = (JSONArray) schema.toJSON(null);
+               assertEquals(null, result);
+       }
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/RethinkDBSinkTest.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/RethinkDBSinkTest.java
new file mode 100644
index 00000000000..4532b8c3b96
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/RethinkDBSinkTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+
+import org.apache.flink.configuration.Configuration;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+
+import org.mockito.runners.MockitoJUnitRunner;
+
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.Db;
+import com.rethinkdb.gen.ast.Insert;
+import com.rethinkdb.gen.ast.Table;
+import com.rethinkdb.net.Connection;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RethinkDBSinkTest {
+
+       private static final String JSON_TEST_TABLE = "JsonTestTable";
+
+       protected RethinkDBSink<String> sink;
+
+       @Mock
+       protected RethinkDB mockRethinkDB;
+
+       @Mock
+       protected Connection mockRethinkDBConnection;
+
+       @Mock(answer=Answers.RETURNS_DEEP_STUBS)
+       private Table mockRethinkDBTable;
+       
+       @Mock(answer=Answers.RETURNS_DEEP_STUBS)
+       protected Db mockDb;
+
+       @Mock(answer=Answers.RETURNS_DEEP_STUBS)
+       protected Insert mockInsert;
+
+       @Mock(answer=Answers.RETURNS_DEEP_STUBS)
+       private com.rethinkdb.net.Connection.Builder builder;
+       
+       protected HashMap<String,Object> result = new HashMap<>();
+
+       @Before
+       public void setUp() {
+               mockRethinkDBTable = mock(Table.class);
+               sink = new RethinkDBSink<String>
+                       ("localhost", 28015, "test", JSON_TEST_TABLE, new 
StringJSONSerializationSchema()){
+                               private static final long serialVersionUID = 1L;
+                               @Override
+                               protected RethinkDB getRethinkDB() {
+                                       return mockRethinkDB;
+                               }
+                               @Override
+                               protected HashMap<String, Object> 
runInsert(Insert insert) {
+                                       return result;
+                               }
+               };
+               when(mockRethinkDB.connection()).thenReturn(builder);
+               when(builder.hostname("localhost").port(28015).user("admin", 
"").connect()).
+                       thenReturn(mockRethinkDBConnection);
+               when(mockRethinkDB.db(any())).thenReturn(mockDb);
+       }       
+       
+       @Test
+       public void testGetters() throws Exception {
+               assertEquals("test", sink.getDatabaseName());
+               assertEquals(JSON_TEST_TABLE, sink.getTableName());
+               assertEquals(28015, sink.getHostport());
+               assertEquals("localhost", sink.getHostname());
+               assertEquals("admin", sink.getUsername());
+               assertEquals("", sink.getPassword());
+       }
+       
+       @Test
+       public void testOpen() throws Exception {
+               
+               
when(mockRethinkDB.db(any()).table(Mockito.eq(JSON_TEST_TABLE))).thenReturn(mockRethinkDBTable);
+               
+               sink.open(new Configuration());
+               
+               Connection connection = sink.getRethinkDbConnection();
+               Table table = sink.getRdbTable();
+               
+               assertEquals("Connection should be same", 
mockRethinkDBConnection, connection);
+               assertEquals("Table should be same", mockRethinkDBTable, table);
+       }
+
+       @Test
+       public void testClose() throws Exception {
+               sink.open(new Configuration());
+               sink.close();
+               verify(mockRethinkDBConnection).close();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testInvokeSuccess() throws Exception {
+               result.put(RethinkDBSink.RESULT_ERROR_KEY, 0L);
+               
when(mockRethinkDB.db(Mockito.any()).table(JSON_TEST_TABLE)).thenReturn(mockRethinkDBTable);
+               
+               JSONObject json = new JSONObject();
+               json.put("key1", "value1");
+
+               when(mockRethinkDBTable.insert(json)).thenReturn(mockInsert);
+               when(mockInsert.optArg(RethinkDBSink.CONFLICT_OPT,
+                               
ConflictStrategy.update.toString())).thenReturn(mockInsert);
+               sink.open(new Configuration());
+               sink.invoke(json.toString());
+               
+               verify(mockInsert).optArg(RethinkDBSink.CONFLICT_OPT,
+                               ConflictStrategy.update.toString());
+               verify(mockRethinkDBTable).insert(json);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test(expected=RuntimeException.class)
+       public void testInvokeErrors() throws Exception {
+               
+               result.put(RethinkDBSink.RESULT_ERROR_KEY, 1L);
+               
when(mockRethinkDB.db(Mockito.any()).table(JSON_TEST_TABLE)).thenReturn(mockRethinkDBTable);
+               
+               JSONObject json = new JSONObject();
+               json.put("key1", "value1");
+
+               when(mockRethinkDBTable.insert(json)).thenReturn(mockInsert);
+               when(mockInsert.optArg(RethinkDBSink.CONFLICT_OPT,
+                               
ConflictStrategy.update.toString())).thenReturn(mockInsert);
+               sink.open(new Configuration());
+               
+               try {
+                       sink.invoke(json.toString());
+               }
+               finally {
+                       verify(mockInsert).optArg(RethinkDBSink.CONFLICT_OPT,
+                                       ConflictStrategy.update.toString());
+                       verify(mockInsert).optArg(RethinkDBSink.CONFLICT_OPT,
+                               ConflictStrategy.update.toString());
+               }
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testNullUsername() throws Exception {
+               sink.setUsernameAndPassword(null, "abcd");
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testEmptyUsername() throws Exception {
+               sink.setUsernameAndPassword("", "abcd");
+       }
+
+       @Test
+       public void testDefaultUserNameAndPassword() throws Exception {
+               assertEquals("Password should be equal", "", 
sink.getPassword());
+               assertEquals("Username should be equal", "admin", 
sink.getUsername());
+       }
+
+       @Test
+       public void testSetUserNameAndPassword() throws Exception {
+               sink.setUsernameAndPassword("u1", "abcd");
+               assertEquals("Password should be equal", "abcd", 
sink.getPassword());
+               assertEquals("Username should be equal", "u1", 
sink.getUsername());
+       }
+
+       @Test
+       public void testNullPassword() throws Exception {
+               sink.setUsernameAndPassword("user", null);
+               assertEquals("Password should be equal", "", 
sink.getPassword());
+       }
+
+       @Test
+       public void testEmptyPassword() throws Exception {
+               sink.setUsernameAndPassword("abcd", "");
+               assertEquals("Password should be equal", "", 
sink.getPassword());
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testNullDurablity() throws Exception {
+               sink.setDurability(null);
+       }
+
+       @Test
+       public void testDurabilityToSoft() throws Exception {
+               assertEquals("Durability should be equal", Durability.hard, 
sink.getDurability());              
+               sink.setDurability(Durability.soft);
+               assertEquals("Durability should be equal", Durability.soft, 
sink.getDurability());
+       }
+
+       @Test
+       public void testDurabilityToHard() throws Exception {
+               assertEquals("Durability should be equal", Durability.hard, 
sink.getDurability());              
+               sink.setDurability(Durability.hard);
+               assertEquals("Durability should be equal", Durability.hard, 
sink.getDurability());
+       }
+
+       @Test
+       public void testConflictToError() throws Exception {
+               assertEquals("Conflict should be equal", 
ConflictStrategy.update, sink.getConflictStrategy());          
+               sink.setConflictStrategy(ConflictStrategy.error);
+               assertEquals("ConflictStrategy should be equal", 
ConflictStrategy.error, sink.getConflictStrategy());
+       }
+
+       @Test
+       public void testConflictToUpdate() throws Exception {
+               assertEquals("Conflict should be equal", 
ConflictStrategy.update, sink.getConflictStrategy());          
+               sink.setConflictStrategy(ConflictStrategy.update);
+               assertEquals("ConflictStrategy should be equal", 
ConflictStrategy.update, sink.getConflictStrategy());
+       }
+
+       @Test
+       public void testConflictToReplace() throws Exception {
+               assertEquals("Conflict should be equal", 
ConflictStrategy.update, sink.getConflictStrategy());          
+               sink.setConflictStrategy(ConflictStrategy.replace);
+               assertEquals("ConflictStrategy should be equal", 
ConflictStrategy.replace, sink.getConflictStrategy());
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testNullConflictStrategy() throws Exception {
+               sink.setConflictStrategy(null);
+       }
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/StringJSONSerializationSchemaTest.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/StringJSONSerializationSchemaTest.java
new file mode 100644
index 00000000000..dea2cf4203e
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/StringJSONSerializationSchemaTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import static org.junit.Assert.*;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test cases for string serializer schema class
+ */
+public class StringJSONSerializationSchemaTest {
+
+       protected StringJSONSerializationSchema schema;
+       
+       @Before
+       public void setUp() {
+               schema = new StringJSONSerializationSchema();
+       }
+       
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testJSONObject() throws Exception {
+               JSONObject json = new JSONObject();
+               json.put("key1", "value1");
+               
+               JSONObject jsonNested = new JSONObject();
+               jsonNested.put("nest1", "nestedValue1");
+               
+               json.put("nested", jsonNested);
+               
+               JSONObject result = (JSONObject) schema.toJSON(json.toString());
+               assertEquals("Json objects should be same", json, result);
+       }
+       
+       @Test(expected=ParseException.class)
+       public void testJSONObjectBadInput() throws Exception {
+               @SuppressWarnings("unused")
+               JSONObject result = (JSONObject) schema.toJSON("bad json");
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testJSONArray() throws Exception {
+               JSONArray array = new JSONArray();
+               JSONObject json1 = new JSONObject();
+               json1.put("key1", "value1");
+               
+               JSONObject json2 = new JSONObject();
+               json2.put("key2", "value2");
+               
+               array.add(json1);
+               array.add(json2);
+               
+               JSONArray result = (JSONArray) schema.toJSON(array.toString());
+               assertEquals("Json objects should be same", array, result);
+       }
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/integration/JSONEventGenerator.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/integration/JSONEventGenerator.java
new file mode 100644
index 00000000000..5ca8ab5b6cf
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/integration/JSONEventGenerator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb.integration;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+/**
+ * Test utility to generate json events for the IntegrationTest.  This class
+ * produces both JSONObject and JSONArray for testing the sink. The JSONObjects
+ * can have an <code>id</code> field based on the {@link #addId} flag.
+ */
+public class JSONEventGenerator implements SourceFunction<String> {
+       
+       /**
+        * Last name field for json
+        */
+       private static final String LAST = "last";
+
+       /**
+        * First name field for json
+        */
+       private static final String FIRST = "first";
+
+       /**
+        * name field for json
+        */
+       private static final String NAME = "name";
+
+       /**
+        * Id field for json
+        */
+       private static final String ID = "id";
+
+       private boolean addId = true;
+       
+       /**
+        * Constructor 
+        * @param addId - whether to add an id field to the json objects or not
+        * The "id" field is default id for RethinkDB documents
+        */
+       public JSONEventGenerator(boolean addId) {
+               this.addId = addId;
+       }
+       /**
+        * Serial version
+        */
+       private static final long serialVersionUID = 1L;
+       
+       /**
+        * Flag to indicate the source is running
+        */
+       private boolean running = true;
+
+       /**
+        * Emit events until {@link #cancel}cancel is called
+        */
+       @SuppressWarnings("unchecked")
+       @Override
+       public void run(SourceContext<String> ctx) throws Exception {
+               long seqSuffix = 0;
+               long loops = 0;
+               while(running) {
+                       Thread.sleep(10);
+                       
+                       seqSuffix++;
+                       JSONObject json1 = new JSONObject();
+                       if ( addId )
+                               json1.put(ID, "1" + seqSuffix);
+                       json1.put(NAME, "john stu " + seqSuffix);
+                       json1.put(FIRST, "john " + seqSuffix);
+                       json1.put(LAST, "stu "+ seqSuffix);
+                       
+                       seqSuffix++;
+                       JSONObject json2 = new JSONObject();
+                       if ( addId )
+                               json2.put(ID, "2" + seqSuffix);
+                       json2.put(NAME, "jane doe"+ seqSuffix);
+                       json2.put(FIRST, "jane"+ seqSuffix);
+                       json2.put(LAST, "doe"+ seqSuffix);
+                       JSONArray array = new JSONArray();
+                       loops++;
+                       if ( loops % 2 == 0 )
+                               array.add(json1);
+                       if ( loops % 3 == 0)
+                               array.add(json2);
+                       
+                       if ( array.size() != 0 )
+                               ctx.collect(array.toString());
+                       else 
+                               ctx.collect(json1.toString());
+               }
+       }
+
+       /**
+        * Cancel producing events
+        */
+       @Override
+       public void cancel() {
+               running = false;
+       }
+       
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/integration/RethinkDBITCase.java
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/integration/RethinkDBITCase.java
new file mode 100644
index 00000000000..2738ed93821
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/java/org/apache/flink/streaming/connectors/rethinkdb/integration/RethinkDBITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb.integration;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.rethinkdb.RethinkDBSink;
+import 
org.apache.flink.streaming.connectors.rethinkdb.StringJSONSerializationSchema;
+
+/**
+ * This is a integration for RethinkDB sink.  For this to run, we need an 
instance of 
+ * RethinkDB running and set the parameters to connect to it.  The code 
instantiates
+ * two json event generators - one which generates <code>id</code> field and 
another without <code>id</code>.
+ */
+public class RethinkDBITCase {
+
+       public static void main(String[] args) throws Exception {
+               
+               ParameterTool parameterTool = ParameterTool.fromArgs(args);
+               String hostname = parameterTool.get("hostname", "localhost");
+               int hostport = parameterTool.getInt("hostport", 28015);
+               String dbname = parameterTool.get("dbname", "testdb");
+               String table = parameterTool.get("table", "testtable");
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(1);
+
+               DataStream<String> stringStreamWithId = 
+                               see.addSource(new JSONEventGenerator(true));
+               
+               DataStream<String> stringStreamWithOutId = 
+                               see.addSource(new JSONEventGenerator(false));
+
+               RethinkDBSink<String> sink = new RethinkDBSink<String>(
+                               hostname, hostport, dbname, table,
+                               new StringJSONSerializationSchema());
+
+               stringStreamWithId.addSink(sink);
+               stringStreamWithOutId.addSink(sink);
+               
+               see.execute();
+       }
+
+}
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/test/resources/log4j.properties
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..14f3ba32b0b
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=WARN, logger
+
+log4j.appender.logger=org.apache.log4j.ConsoleAppender
+log4j.appender.logger.target = System.err
+log4j.appender.logger.layout=org.apache.log4j.PatternLayout
+log4j.appender.logger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, logger
\ No newline at end of file
diff --git 
a/flink-streaming-connectors/flink-connector-rethinkdb/src/test/resources/logback.xml
 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/resources/logback.xml
new file mode 100644
index 00000000000..45db4bac0df
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-rethinkdb/src/test/resources/logback.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
diff --git a/flink-streaming-connectors/pom.xml 
b/flink-streaming-connectors/pom.xml
index 2bfd9b2f18f..84d08bb6fbc 100644
--- a/flink-streaming-connectors/pom.xml
+++ b/flink-streaming-connectors/pom.xml
@@ -45,6 +45,7 @@ under the License.
                <module>flink-connector-rabbitmq</module>
                <module>flink-connector-twitter</module>
                <module>flink-connector-nifi</module>
+               <module>flink-connector-rethinkdb</module>
        </modules>
 
        <!-- See main pom.xml for explanation of profiles -->


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Provide RethinkDB Sink for Flink
> --------------------------------
>
>                 Key: FLINK-3967
>                 URL: https://issues.apache.org/jira/browse/FLINK-3967
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming Connectors
>    Affects Versions: 1.0.3
>         Environment: All
>            Reporter: Mans Singh
>            Assignee: Mans Singh
>            Priority: Minor
>              Labels: features, pull-request-available
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Provide Sink to stream data from flink to rethink db.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to