http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
new file mode 100644
index 0000000..2ed2f8c
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/** 
+ * 
+ * This splits generator actually does nothing but wrapping the query 
parameters
+ * computed by the user before creating the {@link JDBCInputFormat} instance.
+ * 
+ * */
+public class GenericParameterValuesProvider implements ParameterValuesProvider 
{
+
+       private final Serializable[][] parameters;
+       
+       public GenericParameterValuesProvider(Serializable[][] parameters) {
+               this.parameters = parameters;
+       }
+
+       @Override
+       public Serializable[][] getParameterValues(){
+               //do nothing...precomputed externally
+               return parameters;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
new file mode 100644
index 0000000..ac56b98
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+/** 
+ * 
+ * This query generator assumes that the query to parameterize contains a 
BETWEEN constraint on a numeric column.
+ * The generated query set will be of size equal to the configured fetchSize 
(apart the last one range),
+ * ranging from the min value up to the max.
+ * 
+ * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK 
<CODE>id</CODE>, using a query like:
+ * <PRE>
+ *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * </PRE>
+ *
+ * you can use this class to automatically generate the parameters of the 
BETWEEN clause,
+ * based on the passed constructor parameters.
+ * 
+ * */
+public class NumericBetweenParametersProvider implements 
ParameterValuesProvider {
+
+       private long fetchSize;
+       private final long min;
+       private final long max;
+       
+       public NumericBetweenParametersProvider(long fetchSize, long min, long 
max) {
+               this.fetchSize = fetchSize;
+               this.min = min;
+               this.max = max;
+       }
+
+       @Override
+       public Serializable[][] getParameterValues(){
+               double maxElemCount = (max - min) + 1;
+               int size = new Double(Math.ceil(maxElemCount / 
fetchSize)).intValue();
+               Serializable[][] parameters = new Serializable[size][2];
+               int count = 0;
+               for (long i = min; i < max; i += fetchSize, count++) {
+                       long currentLimit = i + fetchSize - 1;
+                       parameters[count] = new Long[]{i,currentLimit};
+                       if (currentLimit + 1 + fetchSize > max) {
+                               parameters[count + 1] = new Long[]{currentLimit 
+ 1, max};
+                               break;
+                       }
+               }
+               return parameters;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
new file mode 100644
index 0000000..c194497
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/**
+ * 
+ * This interface is used by the {@link JDBCInputFormat} to compute the list 
of parallel query to run (i.e. splits).
+ * Each query will be parameterized using a row of the matrix provided by each 
{@link ParameterValuesProvider} implementation
+ * 
+ * */
+public interface ParameterValuesProvider {
+
+       /** Returns the necessary parameters array to use for query in parallel 
a table */
+       public Serializable[][] getParameterValues();
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
new file mode 100644
index 0000000..da9469b
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.table.Row;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCFullTest extends JDBCTestBase {
+
+       @Test
+       public void testJdbcInOut() throws Exception {
+               //run without parallelism
+               runTest(false);
+
+               //cleanup
+               JDBCTestBase.tearDownClass();
+               JDBCTestBase.prepareTestDb();
+               
+               //run expliting parallelism
+               runTest(true);
+               
+       }
+
+       private void runTest(boolean exploitParallelism) {
+               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+               JDBCInputFormatBuilder inputBuilder = 
JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(JDBCTestBase.DRIVER_CLASS)
+                               .setDBUrl(JDBCTestBase.DB_URL)
+                               .setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
+                               .setRowTypeInfo(rowTypeInfo);
+
+               if(exploitParallelism) {
+                       final int fetchSize = 1;
+                       final Long min = new 
Long(JDBCTestBase.testData[0][0].toString());
+                       final Long max = new 
Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 
fetchSize][0].toString());
+                       //use a "splittable" query to exploit parallelism
+                       inputBuilder = inputBuilder
+                                       
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+                                       .setParametersProvider(new 
NumericBetweenParametersProvider(fetchSize, min, max));
+               }
+               DataSet<Row> source = 
environment.createInput(inputBuilder.finish());
+
+               //NOTE: in this case (with Derby driver) setSqlTypes could be 
skipped, but
+               //some database, doens't handle correctly null values when no 
column type specified
+               //in PreparedStatement.setObject (see its javadoc for more 
details)
+               source.output(JDBCOutputFormat.buildJDBCOutputFormat()
+                               .setDrivername(JDBCTestBase.DRIVER_CLASS)
+                               .setDBUrl(JDBCTestBase.DB_URL)
+                               .setQuery("insert into newbooks 
(id,title,author,price,qty) values (?,?,?,?,?)")
+                               .setSqlTypes(new int[]{Types.INTEGER, 
Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
+                               .finish());
+               try {
+                       environment.execute();
+               } catch (Exception e) {
+                       Assert.fail("JDBC full test failed. " + e.getMessage());
+               }
+
+               try (
+                       Connection dbConn = 
DriverManager.getConnection(JDBCTestBase.DB_URL);
+                       PreparedStatement statement = 
dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+                       ResultSet resultSet = statement.executeQuery()
+               ) {
+                       int count = 0;
+                       while (resultSet.next()) {
+                               count++;
+                       }
+                       Assert.assertEquals(JDBCTestBase.testData.length, 
count);
+               } catch (SQLException e) {
+                       Assert.fail("JDBC full test failed. " + e.getMessage());
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
new file mode 100644
index 0000000..efae076
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.ResultSet;
+
+import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
+import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.core.io.InputSplit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCInputFormatTest extends JDBCTestBase {
+
+       private JDBCInputFormat jdbcInputFormat;
+
+       @After
+       public void tearDown() throws IOException {
+               if (jdbcInputFormat != null) {
+                       jdbcInputFormat.close();
+               }
+               jdbcInputFormat = null;
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testUntypedRowInfo() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.idontexist")
+                               .setDBUrl(DB_URL)
+                               .setQuery(SELECT_ALL_BOOKS)
+                               .finish();
+               jdbcInputFormat.openInputFormat();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidDriver() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.idontexist")
+                               .setDBUrl(DB_URL)
+                               .setQuery(SELECT_ALL_BOOKS)
+                               .setRowTypeInfo(rowTypeInfo)
+                               .finish();
+               jdbcInputFormat.openInputFormat();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidURL() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+                               .setQuery(SELECT_ALL_BOOKS)
+                               .setRowTypeInfo(rowTypeInfo)
+                               .finish();
+               jdbcInputFormat.openInputFormat();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidQuery() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               .setQuery("iamnotsql")
+                               .setRowTypeInfo(rowTypeInfo)
+                               .finish();
+               jdbcInputFormat.openInputFormat();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testIncompleteConfiguration() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setQuery(SELECT_ALL_BOOKS)
+                               .setRowTypeInfo(rowTypeInfo)
+                               .finish();
+       }
+
+       @Test
+       public void testJDBCInputFormatWithoutParallelism() throws IOException, 
InstantiationException, IllegalAccessException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               .setQuery(SELECT_ALL_BOOKS)
+                               .setRowTypeInfo(rowTypeInfo)
+                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+                               .finish();
+               //this query does not exploit parallelism
+               Assert.assertEquals(1, 
jdbcInputFormat.createInputSplits(1).length);
+               jdbcInputFormat.openInputFormat();
+               jdbcInputFormat.open(null);
+               Row row =  new Row(5);
+               int recordCount = 0;
+               while (!jdbcInputFormat.reachedEnd()) {
+                       Row next = jdbcInputFormat.nextRecord(row);
+                       if (next == null) {
+                               break;
+                       }
+                       
+                       if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
+                       if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
+                       if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
+                       if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
+                       if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
+
+                       for (int x = 0; x < 5; x++) {
+                               if(testData[recordCount][x]!=null) {
+                                       
Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+                               }
+                       }
+                       recordCount++;
+               }
+               jdbcInputFormat.close();
+               jdbcInputFormat.closeInputFormat();
+               Assert.assertEquals(testData.length, recordCount);
+       }
+       
+       @Test
+       public void 
testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws 
IOException, InstantiationException, IllegalAccessException {
+               final int fetchSize = 1;
+               final Long min = new Long(JDBCTestBase.testData[0][0] + "");
+               final Long max = new 
Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
+               ParameterValuesProvider pramProvider = new 
NumericBetweenParametersProvider(fetchSize, min, max);
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+                               .setRowTypeInfo(rowTypeInfo)
+                               .setParametersProvider(pramProvider)
+                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+                               .finish();
+
+               jdbcInputFormat.openInputFormat();
+               InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+               //this query exploit parallelism (1 split for every id)
+               Assert.assertEquals(testData.length, splits.length);
+               int recordCount = 0;
+               Row row =  new Row(5);
+               for (int i = 0; i < splits.length; i++) {
+                       jdbcInputFormat.open(splits[i]);
+                       while (!jdbcInputFormat.reachedEnd()) {
+                               Row next = jdbcInputFormat.nextRecord(row);
+                               if (next == null) {
+                                       break;
+                               }
+                               if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
+                               if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
+                               if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
+                               if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
+                               if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
+
+                               for (int x = 0; x < 5; x++) {
+                                       if(testData[recordCount][x]!=null) {
+                                               
Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+                                       }
+                               }
+                               recordCount++;
+                       }
+                       jdbcInputFormat.close();
+               }
+               jdbcInputFormat.closeInputFormat();
+               Assert.assertEquals(testData.length, recordCount);
+       }
+       
+       @Test
+       public void testJDBCInputFormatWithParallelismAndGenericSplitting() 
throws IOException, InstantiationException, IllegalAccessException {
+               Serializable[][] queryParameters = new String[2][1];
+               queryParameters[0] = new String[]{"Kumar"};
+               queryParameters[1] = new String[]{"Tan Ah Teck"};
+               ParameterValuesProvider paramProvider = new 
GenericParameterValuesProvider(queryParameters);
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
+                               .setRowTypeInfo(rowTypeInfo)
+                               .setParametersProvider(paramProvider)
+                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+                               .finish();
+               jdbcInputFormat.openInputFormat();
+               InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+               //this query exploit parallelism (1 split for every 
queryParameters row)
+               Assert.assertEquals(queryParameters.length, splits.length);
+               int recordCount = 0;
+               Row row =  new Row(5);
+               for (int i = 0; i < splits.length; i++) {
+                       jdbcInputFormat.open(splits[i]);
+                       while (!jdbcInputFormat.reachedEnd()) {
+                               Row next = jdbcInputFormat.nextRecord(row);
+                               if (next == null) {
+                                       break;
+                               }
+                               if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
+                               if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
+                               if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
+                               if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
+                               if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
+
+                               recordCount++;
+                       }
+                       jdbcInputFormat.close();
+               }
+               Assert.assertEquals(3, recordCount);
+               jdbcInputFormat.closeInputFormat();
+       }
+       
+       @Test
+       public void testEmptyResults() throws IOException, 
InstantiationException, IllegalAccessException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               .setQuery(SELECT_EMPTY)
+                               .setRowTypeInfo(rowTypeInfo)
+                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+                               .finish();
+               jdbcInputFormat.openInputFormat();
+               jdbcInputFormat.open(null);
+               Row row = new Row(5);
+               int recordsCnt = 0;
+               while (!jdbcInputFormat.reachedEnd()) {
+                       Assert.assertNull(jdbcInputFormat.nextRecord(row));
+                       recordsCnt++;
+               }
+               jdbcInputFormat.close();
+               jdbcInputFormat.closeInputFormat();
+               Assert.assertEquals(0, recordsCnt);
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
new file mode 100644
index 0000000..086a84c
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCOutputFormatTest extends JDBCTestBase {
+
+       private JDBCOutputFormat jdbcOutputFormat;
+       private Tuple5<Integer, String, String, Double, String> tuple5 = new 
Tuple5<>();
+
+       @After
+       public void tearDown() throws IOException {
+               if (jdbcOutputFormat != null) {
+                       jdbcOutputFormat.close();
+               }
+               jdbcOutputFormat = null;
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidDriver() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.idontexist")
+                               .setDBUrl(DB_URL)
+                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidURL() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidQuery() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               .setQuery("iamnotsql")
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testIncompleteConfiguration() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
+                               .finish();
+       }
+
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testIncompatibleTypes() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+
+               tuple5.setField(4, 0);
+               tuple5.setField("hello", 1);
+               tuple5.setField("world", 2);
+               tuple5.setField(0.99, 3);
+               tuple5.setField("imthewrongtype", 4);
+
+               Row row = new Row(tuple5.getArity());
+               for (int i = 0; i < tuple5.getArity(); i++) {
+                       row.setField(i, tuple5.getField(i));
+               }
+               jdbcOutputFormat.writeRecord(row);
+               jdbcOutputFormat.close();
+       }
+
+       @Test
+       public void testJDBCOutputFormat() throws IOException, 
InstantiationException, IllegalAccessException {
+
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               .setDrivername(DRIVER_CLASS)
+                               .setDBUrl(DB_URL)
+                               .setQuery(String.format(INSERT_TEMPLATE, 
OUTPUT_TABLE))
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+
+               for (int i = 0; i < testData.length; i++) {
+                       Row row = new Row(testData[i].length);
+                       for (int j = 0; j < testData[i].length; j++) {
+                               row.setField(j, testData[i][j]);
+                       }
+                       jdbcOutputFormat.writeRecord(row);
+               }
+
+               jdbcOutputFormat.close();
+
+               try (
+                       Connection dbConn = 
DriverManager.getConnection(JDBCTestBase.DB_URL);
+                       PreparedStatement statement = 
dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+                       ResultSet resultSet = statement.executeQuery()
+               ) {
+                       int recordCount = 0;
+                       while (resultSet.next()) {
+                               Row row = new Row(tuple5.getArity());
+                               for (int i = 0; i < tuple5.getArity(); i++) {
+                                       row.setField(i, resultSet.getObject(i + 
1));
+                               }
+                               if (row.productElement(0) != null) {
+                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, row.productElement(0).getClass());
+                               }
+                               if (row.productElement(1) != null) {
+                                       Assert.assertEquals("Field 1 should be 
String", String.class, row.productElement(1).getClass());
+                               }
+                               if (row.productElement(2) != null) {
+                                       Assert.assertEquals("Field 2 should be 
String", String.class, row.productElement(2).getClass());
+                               }
+                               if (row.productElement(3) != null) {
+                                       Assert.assertEquals("Field 3 should be 
float", Double.class, row.productElement(3).getClass());
+                               }
+                               if (row.productElement(4) != null) {
+                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, row.productElement(4).getClass());
+                               }
+
+                               for (int x = 0; x < tuple5.getArity(); x++) {
+                                       if 
(JDBCTestBase.testData[recordCount][x] != null) {
+                                               
Assert.assertEquals(JDBCTestBase.testData[recordCount][x], 
row.productElement(x));
+                                       }
+                               }
+
+                               recordCount++;
+                       }
+                       Assert.assertEquals(JDBCTestBase.testData.length, 
recordCount);
+               } catch (SQLException e) {
+                       Assert.fail("JDBC OutputFormat test failed. " + 
e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
new file mode 100644
index 0000000..69ad693
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+/**
+ * Base test class for JDBC Input and Output formats
+ */
+public class JDBCTestBase {
+       
+       public static final String DRIVER_CLASS = 
"org.apache.derby.jdbc.EmbeddedDriver";
+       public static final String DB_URL = "jdbc:derby:memory:ebookshop";
+       public static final String INPUT_TABLE = "books";
+       public static final String OUTPUT_TABLE = "newbooks";
+       public static final String SELECT_ALL_BOOKS = "select * from " + 
INPUT_TABLE;
+       public static final String SELECT_ALL_NEWBOOKS = "select * from " + 
OUTPUT_TABLE;
+       public static final String SELECT_EMPTY = "select * from books WHERE 
QTY < 0";
+       public static final String INSERT_TEMPLATE = "insert into %s (id, 
title, author, price, qty) values (?,?,?,?,?)";
+       public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+       public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
+       
+       protected static Connection conn;
+
+       public static final Object[][] testData = {
+                       {1001, ("Java public for dummies"), ("Tan Ah Teck"), 
11.11, 11},
+                       {1002, ("More Java for dummies"), ("Tan Ah Teck"), 
22.22, 22},
+                       {1003, ("More Java for more dummies"), ("Mohammad 
Ali"), 33.33, 33},
+                       {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+                       {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 
55},
+                       {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 
66.66, 66},
+                       {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 
77.77, 77},
+                       {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 
88.88, 88},
+                       {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 
99.99, 99},
+                       {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), 
null, 1010}};
+
+       public static final TypeInformation<?>[] fieldTypes = new 
TypeInformation<?>[] {
+               BasicTypeInfo.INT_TYPE_INFO,
+               BasicTypeInfo.STRING_TYPE_INFO,
+               BasicTypeInfo.STRING_TYPE_INFO,
+               BasicTypeInfo.DOUBLE_TYPE_INFO,
+               BasicTypeInfo.INT_TYPE_INFO
+       };
+       
+       public static final RowTypeInfo rowTypeInfo = new 
RowTypeInfo(fieldTypes);
+
+       public static String getCreateQuery(String tableName) {
+               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
");
+               sqlQueryBuilder.append(tableName).append(" (");
+               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+               sqlQueryBuilder.append("PRIMARY KEY (id))");
+               return sqlQueryBuilder.toString();
+       }
+       
+       public static String getInsertQuery() {
+               StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
+               for (int i = 0; i < JDBCTestBase.testData.length; i++) {
+                       sqlQueryBuilder.append("(")
+                       .append(JDBCTestBase.testData[i][0]).append(",'")
+                       .append(JDBCTestBase.testData[i][1]).append("','")
+                       .append(JDBCTestBase.testData[i][2]).append("',")
+                       .append(JDBCTestBase.testData[i][3]).append(",")
+                       .append(JDBCTestBase.testData[i][4]).append(")");
+                       if (i < JDBCTestBase.testData.length - 1) {
+                               sqlQueryBuilder.append(",");
+                       }
+               }
+               String insertQuery = sqlQueryBuilder.toString();
+               return insertQuery;
+       }
+       
+       public static final OutputStream DEV_NULL = new OutputStream() {
+               @Override
+               public void write(int b) {
+               }
+       };
+
+       public static void prepareTestDb() throws Exception {
+               System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+               Class.forName(DRIVER_CLASS);
+               Connection conn = DriverManager.getConnection(DB_URL + 
";create=true");
+
+               //create input table
+               Statement stat = conn.createStatement();
+               stat.executeUpdate(getCreateQuery(INPUT_TABLE));
+               stat.close();
+
+               //create output table
+               stat = conn.createStatement();
+               stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
+               stat.close();
+
+               //prepare input data
+               stat = conn.createStatement();
+               stat.execute(JDBCTestBase.getInsertQuery());
+               stat.close();
+
+               conn.close();
+       }
+
+       @BeforeClass
+       public static void setUpClass() throws SQLException {
+               try {
+                       System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+                       prepareDerbyDatabase();
+               } catch (ClassNotFoundException e) {
+                       e.printStackTrace();
+                       Assert.fail();
+               }
+       }
+
+       private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
+               Class.forName(DRIVER_CLASS);
+               conn = DriverManager.getConnection(DB_URL + ";create=true");
+               createTable(INPUT_TABLE);
+               createTable(OUTPUT_TABLE);
+               insertDataIntoInputTable();
+               conn.close();
+       }
+       
+       private static void createTable(String tableName) throws SQLException {
+               Statement stat = conn.createStatement();
+               stat.executeUpdate(getCreateQuery(tableName));
+               stat.close();
+       }
+       
+       private static void insertDataIntoInputTable() throws SQLException {
+               Statement stat = conn.createStatement();
+               stat.execute(JDBCTestBase.getInsertQuery());
+               stat.close();
+       }
+
+       @AfterClass
+       public static void tearDownClass() {
+               cleanUpDerbyDatabases();
+       }
+
+       private static void cleanUpDerbyDatabases() {
+               try {
+                       Class.forName(DRIVER_CLASS);
+                       conn = DriverManager.getConnection(DB_URL + 
";create=true");
+                       Statement stat = conn.createStatement();
+                       stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
+                       stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
+                       stat.close();
+                       conn.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties 
b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2fb9345
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/resources/log4j-test.properties
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml 
b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
new file mode 100644
index 0000000..dcb33eb
--- /dev/null
+++ b/flink-connectors/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+
+       <artifactId>flink-connectors</artifactId>
+       <name>flink-connectors</name>
+       <packaging>pom</packaging>
+
+       <modules>
+               <module>flink-avro</module>
+               <module>flink-jdbc</module>
+               <module>flink-hadoop-compatibility</module>
+               <module>flink-hbase</module>
+               <module>flink-hcatalog</module>
+               <module>flink-connector-flume</module>
+               <module>flink-connector-kafka-base</module>
+               <module>flink-connector-kafka-0.8</module>
+               <module>flink-connector-kafka-0.9</module>
+               <module>flink-connector-kafka-0.10</module>
+               <module>flink-connector-elasticsearch</module>
+               <module>flink-connector-elasticsearch2</module>
+               <module>flink-connector-rabbitmq</module>
+               <module>flink-connector-twitter</module>
+               <module>flink-connector-nifi</module>
+               <module>flink-connector-cassandra</module>
+               <module>flink-connector-redis</module>
+               <module>flink-connector-filesystem</module>
+       </modules>
+
+       <!-- See main pom.xml for explanation of profiles -->
+       <profiles>
+               <!--
+                       We include the kinesis module only optionally because 
it contains a dependency
+                       licenced under the "Amazon Software License".
+                       In accordance with the discussion in 
https://issues.apache.org/jira/browse/LEGAL-198
+                       this is an optional module for Flink.
+               -->
+               <profile>
+                       <id>include-kinesis</id>
+                       <modules>
+                               <module>flink-connector-kinesis</module>
+                       </modules>
+               </profile>
+       </profiles>
+       
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml 
b/flink-streaming-connectors/flink-connector-cassandra/pom.xml
deleted file mode 100644
index 3a1731c..0000000
--- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml
+++ /dev/null
@@ -1,179 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-connector-cassandra_2.10</artifactId>
-       <name>flink-connector-cassandra</name>
-
-       <packaging>jar</packaging>
-
-       <!-- Allow users to pass custom connector versions -->
-       <properties>
-               <cassandra.version>2.2.5</cassandra.version>
-               <driver.version>3.0.0</driver.version>
-       </properties>
-
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-surefire-plugin</artifactId>
-                               <configuration>
-                                       <reuseForks>true</reuseForks>
-                                       <forkCount>1</forkCount>
-                                       <argLine>-Xms256m -Xmx2800m 
-Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
-                               </configuration>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-shade-plugin</artifactId>
-                               <version>2.4.1</version>
-                               <executions>
-                                       <!-- Run shade goal on package phase -->
-                                       <execution>
-                                               <id>shade-flink</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>shade</goal>
-                                               </goals>
-                                               <configuration 
combine.self="override">
-                                                       
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-                                                       <artifactSet>
-                                                               <includes>
-                                                                       
<include>com.datastax.cassandra:cassandra-driver-core</include>
-                                                                       
<include>com.datastax.cassandra:cassandra-driver-mapping</include>
-                                                                       
<include>com.google.guava:guava</include>
-                                                               </includes>
-                                                       </artifactSet>
-                                                       <relocations>
-                                                               <relocation>
-                                                                       
<pattern>com.google</pattern>
-                                                                       
<shadedPattern>org.apache.flink.cassandra.shaded.com.google</shadedPattern>
-                                                                       
<excludes>
-                                                                               
<exclude>com.google.protobuf.**</exclude>
-                                                                               
<exclude>com.google.inject.**</exclude>
-                                                                       
</excludes>
-                                                               </relocation>
-                                                       </relocations>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-       </build>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <dependency>
-                       <groupId>com.datastax.cassandra</groupId>
-                       <artifactId>cassandra-driver-core</artifactId>
-                       <version>${driver.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       
<artifactId>log4j-over-slf4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>ch.qos.logback</groupId>
-                                       <artifactId>logback-classic</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-               <dependency>
-                       <groupId>com.datastax.cassandra</groupId>
-                       <artifactId>cassandra-driver-mapping</artifactId>
-                       <version>${driver.version}</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       
<artifactId>log4j-over-slf4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>ch.qos.logback</groupId>
-                                       <artifactId>logback-classic</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.cassandra</groupId>
-                       <artifactId>cassandra-all</artifactId>
-                       <version>${cassandra.version}</version>
-                       <scope>test</scope>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.slf4j</groupId>
-                                       
<artifactId>log4j-over-slf4j</artifactId>
-                               </exclusion>
-                               <exclusion>
-                                       <groupId>ch.qos.logback</groupId>
-                                       <artifactId>logback-classic</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
deleted file mode 100644
index 849e023..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * InputFormat to read data from Apache Cassandra and generate ${@link Tuple}.
- *
- * @param <OUT> type of Tuple
- */
-public class CassandraInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT, InputSplit> implements NonParallelInput {
-       private static final Logger LOG = 
LoggerFactory.getLogger(CassandraInputFormat.class);
-
-       private final String query;
-       private final ClusterBuilder builder;
-
-       private transient Cluster cluster;
-       private transient Session session;
-       private transient ResultSet resultSet;
-
-       public CassandraInputFormat(String query, ClusterBuilder builder) {
-               Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
-               Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
-
-               this.query = query;
-               this.builder = builder;
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-               this.cluster = builder.getCluster();
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
-               return cachedStatistics;
-       }
-
-       /**
-        * Opens a Session and executes the query.
-        *
-        * @param ignored
-        * @throws IOException
-        */
-       @Override
-       public void open(InputSplit ignored) throws IOException {
-               this.session = cluster.connect();
-               this.resultSet = session.execute(query);
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return resultSet.isExhausted();
-       }
-
-       @Override
-       public OUT nextRecord(OUT reuse) throws IOException {
-               final Row item = resultSet.one();
-               for (int i = 0; i < reuse.getArity(); i++) {
-                       reuse.setField(item.getObject(i), i);
-               }
-               return reuse;
-       }
-
-       @Override
-       public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
-               GenericInputSplit[] split = {new GenericInputSplit(0, 1)};
-               return split;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(InputSplit[] 
inputSplits) {
-               return new DefaultInputSplitAssigner(inputSplits);
-       }
-
-       /**
-        * Closes all resources used.
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       if (session != null) {
-                               session.close();
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while closing session.", e);
-               }
-
-               try {
-                       if (cluster != null ) {
-                               cluster.close();
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while closing cluster.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
deleted file mode 100644
index 15d8fb3..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.batch.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} into 
Apache Cassandra.
- *
- * @param <OUT> type of Tuple
- */
-public class CassandraOutputFormat<OUT extends Tuple> extends 
RichOutputFormat<OUT> {
-       private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormat.class);
-
-       private final String insertQuery;
-       private final ClusterBuilder builder;
-
-       private transient Cluster cluster;
-       private transient Session session;
-       private transient PreparedStatement prepared;
-       private transient FutureCallback<ResultSet> callback;
-       private transient Throwable exception = null;
-
-       public CassandraOutputFormat(String insertQuery, ClusterBuilder 
builder) {
-               
Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot 
be null or empty");
-               Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
-
-               this.insertQuery = insertQuery;
-               this.builder = builder;
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-               this.cluster = builder.getCluster();
-       }
-
-       /**
-        * Opens a Session to Cassandra and initializes the prepared statement.
-        *
-        * @param taskNumber The number of the parallel instance.
-        * @throws IOException Thrown, if the output could not be opened due to 
an
-        *                     I/O problem.
-        */
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               this.session = cluster.connect();
-               this.prepared = session.prepare(insertQuery);
-               this.callback = new FutureCallback<ResultSet>() {
-                       @Override
-                       public void onSuccess(ResultSet ignored) {
-                       }
-
-                       @Override
-                       public void onFailure(Throwable t) {
-                               exception = t;
-                       }
-               };
-       }
-
-       @Override
-       public void writeRecord(OUT record) throws IOException {
-               if (exception != null) {
-                       throw new IOException("write record failed", exception);
-               }
-
-               Object[] fields = new Object[record.getArity()];
-               for (int i = 0; i < record.getArity(); i++) {
-                       fields[i] = record.getField(i);
-               }
-               ResultSetFuture result = 
session.executeAsync(prepared.bind(fields));
-               Futures.addCallback(result, callback);
-       }
-
-       /**
-        * Closes all resources used.
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       if (session != null) {
-                               session.close();
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while closing session.", e);
-               }
-
-               try {
-                       if (cluster != null ) {
-                               cluster.close();
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while closing cluster.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
deleted file mode 100644
index 63b76da..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
- * database.
- * 
- * <p>Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
- */
-public class CassandraCommitter extends CheckpointCommitter {
-
-       private static final long serialVersionUID = 1L;
-       
-       private final ClusterBuilder builder;
-       private transient Cluster cluster;
-       private transient Session session;
-
-       private String keySpace = "flink_auxiliary";
-       private String table = "checkpoints_";
-
-       /**
-        * A cache of the last committed checkpoint ids per subtask index. This 
is used to
-        * avoid redundant round-trips to Cassandra (see {@link 
#isCheckpointCommitted(int, long)}.
-        */
-       private final Map<Integer, Long> lastCommittedCheckpoints = new 
HashMap<>();
-
-       public CassandraCommitter(ClusterBuilder builder) {
-               this.builder = builder;
-               ClosureCleaner.clean(builder, true);
-       }
-
-       public CassandraCommitter(ClusterBuilder builder, String keySpace) {
-               this(builder);
-               this.keySpace = keySpace;
-       }
-
-       /**
-        * Internally used to set the job ID after instantiation.
-        */
-       public void setJobId(String id) throws Exception {
-               super.setJobId(id);
-               table += id;
-       }
-
-       /**
-        * Generates the necessary tables to store information.
-        *
-        * @throws Exception
-        */
-       @Override
-       public void createResource() throws Exception {
-               cluster = builder.getCluster();
-               session = cluster.connect();
-
-               session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':1};", 
keySpace));
-               session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", keySpace, table));
-
-               try {
-                       session.close();
-               } catch (Exception e) {
-                       LOG.error("Error while closing session.", e);
-               }
-               try {
-                       cluster.close();
-               } catch (Exception e) {
-                       LOG.error("Error while closing cluster.", e);
-               }
-       }
-
-       @Override
-       public void open() throws Exception {
-               if (builder == null) {
-                       throw new RuntimeException("No ClusterBuilder was 
set.");
-               }
-               cluster = builder.getCluster();
-               session = cluster.connect();
-       }
-
-       @Override
-       public void close() throws Exception {
-               this.lastCommittedCheckpoints.clear();
-               try {
-                       session.close();
-               } catch (Exception e) {
-                       LOG.error("Error while closing session.", e);
-               }
-               try {
-                       cluster.close();
-               } catch (Exception e) {
-                       LOG.error("Error while closing cluster.", e);
-               }
-       }
-
-       @Override
-       public void commitCheckpoint(int subtaskIdx, long checkpointId) {
-               String statement = String.format(
-                       "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' 
and sub_id=%d;",
-                       keySpace, table, checkpointId, operatorId, subtaskIdx);
-
-               session.execute(statement);
-               lastCommittedCheckpoints.put(subtaskIdx, checkpointId);
-       }
-
-       @Override
-       public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) 
{
-               // Pending checkpointed buffers are committed in ascending 
order of their
-               // checkpoint id. This way we can tell if a checkpointed buffer 
was committed
-               // just by asking the third-party storage system for the last 
checkpoint id
-               // committed by the specified subtask.
-
-               Long lastCommittedCheckpoint = 
lastCommittedCheckpoints.get(subtaskIdx);
-               if (lastCommittedCheckpoint == null) {
-                       String statement = String.format(
-                               "SELECT checkpoint_id FROM %s.%s where 
sink_id='%s' and sub_id=%d;",
-                               keySpace, table, operatorId, subtaskIdx);
-
-                       Iterator<Row> resultIt = 
session.execute(statement).iterator();
-                       if (resultIt.hasNext()) {
-                               lastCommittedCheckpoint = 
resultIt.next().getLong("checkpoint_id");
-                               lastCommittedCheckpoints.put(subtaskIdx, 
lastCommittedCheckpoint);
-                       }
-               }
-               return lastCommittedCheckpoint != null && checkpointId <= 
lastCommittedCheckpoint;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
deleted file mode 100644
index 650c481..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster using 
- * <a 
href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html";>Mapper</a>,
- * which it uses annotations from
- * <a 
href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/annotations/package-summary.html";>
- * com.datastax.driver.mapping.annotations</a>.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
-
-       private static final long serialVersionUID = 1L;
-
-       protected final Class<IN> clazz;
-       protected transient Mapper<IN> mapper;
-       protected transient MappingManager mappingManager;
-
-       /**
-        * The main constructor for creating CassandraPojoSink
-        *
-        * @param clazz Class<IN> instance
-        */
-       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
-               super(builder);
-               this.clazz = clazz;
-       }
-
-       @Override
-       public void open(Configuration configuration) {
-               super.open(configuration);
-               try {
-                       this.mappingManager = new MappingManager(session);
-                       this.mapper = mappingManager.mapper(clazz);
-               } catch (Exception e) {
-                       throw new RuntimeException("Cannot create 
CassandraPojoSink with input: " + clazz.getSimpleName(), e);
-               }
-       }
-
-       @Override
-       public ListenableFuture<Void> send(IN value) {
-               return mapper.saveAsync(value);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
deleted file mode 100644
index 180b638..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-
-/**
- * This class wraps different Cassandra sink implementations to provide a 
common interface for all of them.
- *
- * @param <IN> input type
- */
-public class CassandraSink<IN> {
-       private final boolean useDataStreamSink;
-       private DataStreamSink<IN> sink1;
-       private SingleOutputStreamOperator<IN> sink2;
-
-       private CassandraSink(DataStreamSink<IN> sink) {
-               sink1 = sink;
-               useDataStreamSink = true;
-       }
-
-       private CassandraSink(SingleOutputStreamOperator<IN> sink) {
-               sink2 = sink;
-               useDataStreamSink = false;
-       }
-
-       private SinkTransformation<IN> getSinkTransformation() {
-               return sink1.getTransformation();
-       }
-
-       private StreamTransformation<IN> getStreamTransformation() {
-               return sink2.getTransformation();
-       }
-
-       /**
-        * Sets the name of this sink. This name is
-        * used by the visualization and logging during runtime.
-        *
-        * @return The named sink.
-        */
-       public CassandraSink<IN> name(String name) {
-               if (useDataStreamSink) {
-                       getSinkTransformation().setName(name);
-               } else {
-                       getStreamTransformation().setName(name);
-               }
-               return this;
-       }
-
-       /**
-        * Sets an ID for this operator.
-        * <p/>
-        * <p>The specified ID is used to assign the same operator ID across job
-        * submissions (for example when starting a job from a savepoint).
-        * <p/>
-        * <p><strong>Important</strong>: this ID needs to be unique per
-        * transformation and job. Otherwise, job submission will fail.
-        *
-        * @param uid The unique user-specified ID of this transformation.
-        * @return The operator with the specified ID.
-        */
-       public CassandraSink<IN> uid(String uid) {
-               if (useDataStreamSink) {
-                       getSinkTransformation().setUid(uid);
-               } else {
-                       getStreamTransformation().setUid(uid);
-               }
-               return this;
-       }
-
-       /**
-        * Sets the parallelism for this sink. The degree must be higher than 
zero.
-        *
-        * @param parallelism The parallelism for this sink.
-        * @return The sink with set parallelism.
-        */
-       public CassandraSink<IN> setParallelism(int parallelism) {
-               if (useDataStreamSink) {
-                       getSinkTransformation().setParallelism(parallelism);
-               } else {
-                       getStreamTransformation().setParallelism(parallelism);
-               }
-               return this;
-       }
-
-       /**
-        * Turns off chaining for this operator so thread co-location will not 
be
-        * used as an optimization.
-        * <p/>
-        * <p/>
-        * Chaining can be turned off for the whole
-        * job by {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
-        * however it is not advised for performance considerations.
-        *
-        * @return The sink with chaining disabled
-        */
-       public CassandraSink<IN> disableChaining() {
-               if (useDataStreamSink) {
-                       
getSinkTransformation().setChainingStrategy(ChainingStrategy.NEVER);
-               } else {
-                       
getStreamTransformation().setChainingStrategy(ChainingStrategy.NEVER);
-               }
-               return this;
-       }
-
-       /**
-        * Sets the slot sharing group of this operation. Parallel instances of
-        * operations that are in the same slot sharing group will be 
co-located in the same
-        * TaskManager slot, if possible.
-        * <p/>
-        * <p>Operations inherit the slot sharing group of input operations if 
all input operations
-        * are in the same slot sharing group and no slot sharing group was 
explicitly specified.
-        * <p/>
-        * <p>Initially an operation is in the default slot sharing group. An 
operation can be put into
-        * the default group explicitly by setting the slot sharing group to 
{@code "default"}.
-        *
-        * @param slotSharingGroup The slot sharing group name.
-        */
-       public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
-               if (useDataStreamSink) {
-                       
getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
-               } else {
-                       
getStreamTransformation().setSlotSharingGroup(slotSharingGroup);
-               }
-               return this;
-       }
-
-       /**
-        * Writes a DataStream into a Cassandra database.
-        *
-        * @param input input DataStream
-        * @param <IN>  input type
-        * @return CassandraSinkBuilder, to further configure the sink
-        */
-       public static <IN, T extends Tuple> CassandraSinkBuilder<IN> 
addSink(DataStream<IN> input) {
-               if (input.getType() instanceof TupleTypeInfo) {
-                       DataStream<T> tupleInput = (DataStream<T>) input;
-                       return (CassandraSinkBuilder<IN>) new 
CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), 
tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
-               } else {
-                       return new CassandraPojoSinkBuilder<>(input, 
input.getType(), 
input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
-               }
-       }
-
-       public abstract static class CassandraSinkBuilder<IN> {
-               protected final DataStream<IN> input;
-               protected final TypeSerializer<IN> serializer;
-               protected final TypeInformation<IN> typeInfo;
-               protected ClusterBuilder builder;
-               protected String query;
-               protected CheckpointCommitter committer;
-               protected boolean isWriteAheadLogEnabled;
-
-               public CassandraSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-                       this.input = input;
-                       this.typeInfo = typeInfo;
-                       this.serializer = serializer;
-               }
-
-               /**
-                * Sets the query that is to be executed for every record.
-                *
-                * @param query query to use
-                * @return this builder
-                */
-               public CassandraSinkBuilder<IN> setQuery(String query) {
-                       this.query = query;
-                       return this;
-               }
-
-               /**
-                * Sets the cassandra host to connect to.
-                *
-                * @param host host to connect to
-                * @return this builder
-                */
-               public CassandraSinkBuilder<IN> setHost(String host) {
-                       return setHost(host, 9042);
-               }
-
-               /**
-                * Sets the cassandra host/port to connect to.
-                *
-                * @param host host to connect to
-                * @param port port to connect to
-                * @return this builder
-                */
-               public CassandraSinkBuilder<IN> setHost(final String host, 
final int port) {
-                       if (this.builder != null) {
-                               throw new IllegalArgumentException("Builder was 
already set. You must use either setHost() or setClusterBuilder().");
-                       }
-                       this.builder = new ClusterBuilder() {
-                               @Override
-                               protected Cluster buildCluster(Cluster.Builder 
builder) {
-                                       return 
builder.addContactPoint(host).withPort(port).build();
-                               }
-                       };
-                       return this;
-               }
-
-               /**
-                * Sets the ClusterBuilder for this sink. A ClusterBuilder is 
used to configure the connection to cassandra.
-                *
-                * @param builder ClusterBuilder to configure the connection to 
cassandra
-                * @return this builder
-                */
-               public CassandraSinkBuilder<IN> 
setClusterBuilder(ClusterBuilder builder) {
-                       if (this.builder != null) {
-                               throw new IllegalArgumentException("Builder was 
already set. You must use either setHost() or setClusterBuilder().");
-                       }
-                       this.builder = builder;
-                       return this;
-               }
-
-               /**
-                * Enables the write-ahead log, which allows exactly-once 
processing for non-deterministic algorithms that use
-                * idempotent updates.
-                *
-                * @return this builder
-                */
-               public CassandraSinkBuilder<IN> enableWriteAheadLog() {
-                       this.isWriteAheadLogEnabled = true;
-                       return this;
-               }
-
-               /**
-                * Enables the write-ahead log, which allows exactly-once 
processing for non-deterministic algorithms that use
-                * idempotent updates.
-                *
-                * @param committer CheckpointCommitter, that stores 
informationa bout completed checkpoints in an external
-                *                  resource. By default this information is 
stored within a separate table within Cassandra.
-                * @return this builder
-                */
-               public CassandraSinkBuilder<IN> 
enableWriteAheadLog(CheckpointCommitter committer) {
-                       this.isWriteAheadLogEnabled = true;
-                       this.committer = committer;
-                       return this;
-               }
-
-               /**
-                * Finalizes the configuration of this sink.
-                *
-                * @return finalized sink
-                * @throws Exception
-                */
-               public abstract CassandraSink<IN> build() throws Exception;
-
-               protected void sanityCheck() {
-                       if (builder == null) {
-                               throw new IllegalArgumentException("Cassandra 
host information must be supplied using either setHost() or 
setClusterBuilder().");
-                       }
-               }
-       }
-
-       public static class CassandraTupleSinkBuilder<IN extends Tuple> extends 
CassandraSinkBuilder<IN> {
-               public CassandraTupleSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-                       super(input, typeInfo, serializer);
-               }
-
-               @Override
-               protected void sanityCheck() {
-                       super.sanityCheck();
-                       if (query == null || query.length() == 0) {
-                               throw new IllegalArgumentException("Query must 
not be null or empty.");
-                       }
-               }
-
-               @Override
-               public CassandraSink<IN> build() throws Exception {
-                       sanityCheck();
-                       if (isWriteAheadLogEnabled) {
-                               return committer == null
-                                       ? new 
CassandraSink<>(input.transform("Cassandra Sink", null, new 
CassandraTupleWriteAheadSink<>(query, serializer, builder, new 
CassandraCommitter(builder))))
-                                       : new 
CassandraSink<>(input.transform("Cassandra Sink", null, new 
CassandraTupleWriteAheadSink<>(query, serializer, builder, committer)));
-                       } else {
-                               return new CassandraSink<>(input.addSink(new 
CassandraTupleSink<IN>(query, builder)).name("Cassandra Sink"));
-                       }
-               }
-       }
-
-       public static class CassandraPojoSinkBuilder<IN> extends 
CassandraSinkBuilder<IN> {
-               public CassandraPojoSinkBuilder(DataStream<IN> input, 
TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
-                       super(input, typeInfo, serializer);
-               }
-
-               @Override
-               protected void sanityCheck() {
-                       super.sanityCheck();
-                       if (query != null) {
-                               throw new IllegalArgumentException("Specifying 
a query is not allowed when using a Pojo-Stream as input.");
-                       }
-               }
-
-               @Override
-               public CassandraSink<IN> build() throws Exception {
-                       sanityCheck();
-                       if (isWriteAheadLogEnabled) {
-                               throw new 
IllegalArgumentException("Exactly-once guarantees can only be provided for 
tuple types.");
-                       } else {
-                               return new CassandraSink<>(input.addSink(new 
CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
deleted file mode 100644
index 49b1efa..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} 
and {@link CassandraTupleSink}.
- *
- * @param <IN> Type of the elements emitted by this sink
- */
-public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
-       protected static final Logger LOG = 
LoggerFactory.getLogger(CassandraSinkBase.class);
-       protected transient Cluster cluster;
-       protected transient Session session;
-
-       protected transient Throwable exception = null;
-       protected transient FutureCallback<V> callback;
-
-       private final ClusterBuilder builder;
-
-       protected CassandraSinkBase(ClusterBuilder builder) {
-               this.builder = builder;
-               ClosureCleaner.clean(builder, true);
-       }
-
-       @Override
-       public void open(Configuration configuration) {
-               this.callback = new FutureCallback<V>() {
-                       @Override
-                       public void onSuccess(V ignored) {
-                       }
-
-                       @Override
-                       public void onFailure(Throwable t) {
-                               exception = t;
-                               LOG.error("Error while sending value.", t);
-                       }
-               };
-               this.cluster = builder.getCluster();
-               this.session = cluster.connect();
-       }
-
-       @Override
-       public void invoke(IN value) throws Exception {
-               if (exception != null) {
-                       throw new IOException("invoke() failed", exception);
-               }
-               ListenableFuture<V> result = send(value);
-               Futures.addCallback(result, callback);
-       }
-
-       public abstract ListenableFuture<V> send(IN value);
-
-       @Override
-       public void close() {
-               try {
-                       if (session != null) {
-                               session.close();
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while closing session.", e);
-               }
-               try {
-                       if (cluster != null) {
-                               cluster.close();
-                       }
-               } catch (Exception e) {
-                       LOG.error("Error while closing cluster.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
deleted file mode 100644
index 0a9ef06..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleSink.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * Flink Sink to save data into a Cassandra cluster.
- *
- * @param <IN> Type of the elements emitted by this sink, it must extend 
{@link Tuple}
- */
-public class CassandraTupleSink<IN extends Tuple> extends 
CassandraSinkBase<IN, ResultSet> {
-       private final String insertQuery;
-       private transient PreparedStatement ps;
-
-       public CassandraTupleSink(String insertQuery, ClusterBuilder builder) {
-               super(builder);
-               this.insertQuery = insertQuery;
-       }
-
-       @Override
-       public void open(Configuration configuration) {
-               super.open(configuration);
-               this.ps = session.prepare(insertQuery);
-       }
-
-       @Override
-       public ListenableFuture<ResultSet> send(IN value) {
-               Object[] fields = extract(value);
-               return session.executeAsync(ps.bind(fields));
-       }
-
-       private Object[] extract(IN record) {
-               Object[] al = new Object[record.getArity()];
-               for (int i = 0; i < record.getArity(); i++) {
-                       al[i] = record.getField(i);
-               }
-               return al;
-       }
-}

Reply via email to