http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
deleted file mode 100644
index b4246f5..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Arrays;
-
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * InputFormat to read data from a database and generate Rows.
- * The InputFormat has to be configured using the supplied InputFormatBuilder.
- * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
- *
- * <pre><code>
- * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
- *             BasicTypeInfo.INT_TYPE_INFO,
- *             BasicTypeInfo.STRING_TYPE_INFO,
- *             BasicTypeInfo.STRING_TYPE_INFO,
- *             BasicTypeInfo.DOUBLE_TYPE_INFO,
- *             BasicTypeInfo.INT_TYPE_INFO
- *     };
- *
- * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
- *
- * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- *                             
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- *                             .setDBUrl("jdbc:derby:memory:ebookshop")
- *                             .setQuery("select * from books")
- *                             .setRowTypeInfo(rowTypeInfo)
- *                             .finish();
- * </code></pre>
- *
- * In order to query the JDBC source in parallel, you need to provide a
- * parameterized query template (i.e. a valid {@link PreparedStatement}) and
- * a {@link ParameterValuesProvider} which provides binding values for the
- * query parameters. E.g.:</br>
- *
- * <pre><code>
- *
- * Serializable[][] queryParameters = new String[2][1];
- * queryParameters[0] = new String[]{"Kumar"};
- * queryParameters[1] = new String[]{"Tan Ah Teck"};
- *
- * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- *                             
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
- *                             .setDBUrl("jdbc:derby:memory:ebookshop")
- *                             .setQuery("select * from books WHERE author = 
?")
- *                             .setRowTypeInfo(rowTypeInfo)
- *                             .setParametersProvider(new 
GenericParameterValuesProvider(queryParameters))
- *                             .finish();
- * </code></pre>
- *
- * @see Row
- * @see ParameterValuesProvider
- * @see PreparedStatement
- * @see DriverManager
- */
-public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> 
implements ResultTypeQueryable {
-
-       private static final long serialVersionUID = 1L;
-       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCInputFormat.class);
-
-       private String username;
-       private String password;
-       private String drivername;
-       private String dbURL;
-       private String queryTemplate;
-       private int resultSetType;
-       private int resultSetConcurrency;
-       private RowTypeInfo rowTypeInfo;
-
-       private transient Connection dbConn;
-       private transient PreparedStatement statement;
-       private transient ResultSet resultSet;
-
-       private boolean hasNext;
-       private Object[][] parameterValues;
-
-       public JDBCInputFormat() {
-       }
-
-       @Override
-       public RowTypeInfo getProducedType() {
-               return rowTypeInfo;
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-               //do nothing here
-       }
-
-       @Override
-       public void openInputFormat() {
-               //called once per inputFormat (on open)
-               try {
-                       Class.forName(drivername);
-                       if (username == null) {
-                               dbConn = DriverManager.getConnection(dbURL);
-                       } else {
-                               dbConn = DriverManager.getConnection(dbURL, 
username, password);
-                       }
-                       statement = dbConn.prepareStatement(queryTemplate, 
resultSetType, resultSetConcurrency);
-               } catch (SQLException se) {
-                       throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
-               } catch (ClassNotFoundException cnfe) {
-                       throw new IllegalArgumentException("JDBC-Class not 
found. - " + cnfe.getMessage(), cnfe);
-               }
-       }
-
-       @Override
-       public void closeInputFormat() {
-               //called once per inputFormat (on close)
-               try {
-                       if(statement != null) {
-                               statement.close();
-                       }
-               } catch (SQLException se) {
-                       LOG.info("Inputformat Statement couldn't be closed - " 
+ se.getMessage());
-               } finally {
-                       statement = null;
-               }
-
-               try {
-                       if(dbConn != null) {
-                               dbConn.close();
-                       }
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } finally {
-                       dbConn = null;
-               }
-
-               parameterValues = null;
-       }
-
-       /**
-        * Connects to the source database and executes the query in a 
<b>parallel
-        * fashion</b> if
-        * this {@link InputFormat} is built using a parameterized query (i.e. 
using
-        * a {@link PreparedStatement})
-        * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
-        * fashion</b> otherwise.
-        *
-        * @param inputSplit which is ignored if this InputFormat is executed 
as a
-        *        non-parallel source,
-        *        a "hook" to the query parameters otherwise (using its
-        *        <i>splitNumber</i>)
-        * @throws IOException if there's an error during the execution of the 
query
-        */
-       @Override
-       public void open(InputSplit inputSplit) throws IOException {
-               try {
-                       if (inputSplit != null && parameterValues != null) {
-                               for (int i = 0; i < 
parameterValues[inputSplit.getSplitNumber()].length; i++) {
-                                       Object param = 
parameterValues[inputSplit.getSplitNumber()][i];
-                                       if (param instanceof String) {
-                                               statement.setString(i + 1, 
(String) param);
-                                       } else if (param instanceof Long) {
-                                               statement.setLong(i + 1, (Long) 
param);
-                                       } else if (param instanceof Integer) {
-                                               statement.setInt(i + 1, 
(Integer) param);
-                                       } else if (param instanceof Double) {
-                                               statement.setDouble(i + 1, 
(Double) param);
-                                       } else if (param instanceof Boolean) {
-                                               statement.setBoolean(i + 1, 
(Boolean) param);
-                                       } else if (param instanceof Float) {
-                                               statement.setFloat(i + 1, 
(Float) param);
-                                       } else if (param instanceof BigDecimal) 
{
-                                               statement.setBigDecimal(i + 1, 
(BigDecimal) param);
-                                       } else if (param instanceof Byte) {
-                                               statement.setByte(i + 1, (Byte) 
param);
-                                       } else if (param instanceof Short) {
-                                               statement.setShort(i + 1, 
(Short) param);
-                                       } else if (param instanceof Date) {
-                                               statement.setDate(i + 1, (Date) 
param);
-                                       } else if (param instanceof Time) {
-                                               statement.setTime(i + 1, (Time) 
param);
-                                       } else if (param instanceof Timestamp) {
-                                               statement.setTimestamp(i + 1, 
(Timestamp) param);
-                                       } else if (param instanceof Array) {
-                                               statement.setArray(i + 1, 
(Array) param);
-                                       } else {
-                                               //extends with other types if 
needed
-                                               throw new 
IllegalArgumentException("open() failed. Parameter " + i + " of type " + 
param.getClass() + " is not handled (yet)." );
-                                       }
-                               }
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug(String.format("Executing '%s' 
with parameters %s", queryTemplate, 
Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
-                               }
-                       }
-                       resultSet = statement.executeQuery();
-                       hasNext = resultSet.next();
-               } catch (SQLException se) {
-                       throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
-               }
-       }
-
-       /**
-        * Closes all resources used.
-        *
-        * @throws IOException Indicates that a resource could not be closed.
-        */
-       @Override
-       public void close() throws IOException {
-               if(resultSet == null) {
-                       return;
-               }
-               try {
-                       resultSet.close();
-               } catch (SQLException se) {
-                       LOG.info("Inputformat ResultSet couldn't be closed - " 
+ se.getMessage());
-               }
-       }
-
-       /**
-        * Checks whether all data has been read.
-        *
-        * @return boolean value indication whether all data has been read.
-        * @throws IOException
-        */
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return !hasNext;
-       }
-
-       /**
-        * Stores the next resultSet row in a tuple
-        *
-        * @param row row to be reused.
-        * @return row containing next {@link Row}
-        * @throws java.io.IOException
-        */
-       @Override
-       public Row nextRecord(Row row) throws IOException {
-               try {
-                       if (!hasNext) {
-                               return null;
-                       }
-                       for (int pos = 0; pos < row.productArity(); pos++) {
-                               row.setField(pos, resultSet.getObject(pos + 1));
-                       }
-                       //update hasNext after we've read the record
-                       hasNext = resultSet.next();
-                       return row;
-               } catch (SQLException se) {
-                       throw new IOException("Couldn't read data - " + 
se.getMessage(), se);
-               } catch (NullPointerException npe) {
-                       throw new IOException("Couldn't access resultSet", npe);
-               }
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
-               return cachedStatistics;
-       }
-
-       @Override
-       public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
-               if (parameterValues == null) {
-                       return new GenericInputSplit[]{new GenericInputSplit(0, 
1)};
-               }
-               GenericInputSplit[] ret = new 
GenericInputSplit[parameterValues.length];
-               for (int i = 0; i < ret.length; i++) {
-                       ret[i] = new GenericInputSplit(i, ret.length);
-               }
-               return ret;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(InputSplit[] 
inputSplits) {
-               return new DefaultInputSplitAssigner(inputSplits);
-       }
-
-       /**
-        * A builder used to set parameters to the output format's 
configuration in a fluent way.
-        * @return builder
-        */
-       public static JDBCInputFormatBuilder buildJDBCInputFormat() {
-               return new JDBCInputFormatBuilder();
-       }
-
-       public static class JDBCInputFormatBuilder {
-               private final JDBCInputFormat format;
-
-               public JDBCInputFormatBuilder() {
-                       this.format = new JDBCInputFormat();
-                       //using TYPE_FORWARD_ONLY for high performance reads
-                       this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
-                       this.format.resultSetConcurrency = 
ResultSet.CONCUR_READ_ONLY;
-               }
-
-               public JDBCInputFormatBuilder setUsername(String username) {
-                       format.username = username;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setPassword(String password) {
-                       format.password = password;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setDrivername(String drivername) {
-                       format.drivername = drivername;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setDBUrl(String dbURL) {
-                       format.dbURL = dbURL;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setQuery(String query) {
-                       format.queryTemplate = query;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setResultSetType(int 
resultSetType) {
-                       format.resultSetType = resultSetType;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setResultSetConcurrency(int 
resultSetConcurrency) {
-                       format.resultSetConcurrency = resultSetConcurrency;
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder 
setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
-                       format.parameterValues = 
parameterValuesProvider.getParameterValues();
-                       return this;
-               }
-
-               public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo 
rowTypeInfo) {
-                       format.rowTypeInfo = rowTypeInfo;
-                       return this;
-               }
-
-               public JDBCInputFormat finish() {
-                       if (format.username == null) {
-                               LOG.info("Username was not supplied 
separately.");
-                       }
-                       if (format.password == null) {
-                               LOG.info("Password was not supplied 
separately.");
-                       }
-                       if (format.dbURL == null) {
-                               throw new IllegalArgumentException("No database 
URL supplied");
-                       }
-                       if (format.queryTemplate == null) {
-                               throw new IllegalArgumentException("No query 
supplied");
-                       }
-                       if (format.drivername == null) {
-                               throw new IllegalArgumentException("No driver 
supplied");
-                       }
-                       if (format.rowTypeInfo == null) {
-                               throw new IllegalArgumentException("No " + 
RowTypeInfo.class.getSimpleName() + " supplied");
-                       }
-                       if (format.parameterValues == null) {
-                               LOG.debug("No input splitting configured (data 
will be read with parallelism 1).");
-                       }
-                       return format;
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
deleted file mode 100644
index da4b1ad..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * OutputFormat to write tuples into a database.
- * The OutputFormat has to be configured using the supplied 
OutputFormatBuilder.
- * 
- * @see Tuple
- * @see DriverManager
- */
-public class JDBCOutputFormat extends RichOutputFormat<Row> {
-       private static final long serialVersionUID = 1L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCOutputFormat.class);
-       
-       private String username;
-       private String password;
-       private String drivername;
-       private String dbURL;
-       private String query;
-       private int batchInterval = 5000;
-       
-       private Connection dbConn;
-       private PreparedStatement upload;
-       
-       private int batchCount = 0;
-       
-       public int[] typesArray;
-       
-       public JDBCOutputFormat() {
-       }
-       
-       @Override
-       public void configure(Configuration parameters) {
-       }
-       
-       /**
-        * Connects to the target database and initializes the prepared 
statement.
-        *
-        * @param taskNumber The number of the parallel instance.
-        * @throws IOException Thrown, if the output could not be opened due to 
an
-        * I/O problem.
-        */
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               try {
-                       establishConnection();
-                       upload = dbConn.prepareStatement(query);
-               } catch (SQLException sqe) {
-                       throw new IllegalArgumentException("open() failed.", 
sqe);
-               } catch (ClassNotFoundException cnfe) {
-                       throw new IllegalArgumentException("JDBC driver class 
not found.", cnfe);
-               }
-       }
-       
-       private void establishConnection() throws SQLException, 
ClassNotFoundException {
-               Class.forName(drivername);
-               if (username == null) {
-                       dbConn = DriverManager.getConnection(dbURL);
-               } else {
-                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
-               }
-       }
-       
-       /**
-        * Adds a record to the prepared statement.
-        * <p>
-        * When this method is called, the output format is guaranteed to be 
opened.
-        * </p>
-        * 
-        * WARNING: this may fail when no column types specified (because a 
best effort approach is attempted in order to
-        * insert a null value but it's not guaranteed that the JDBC driver 
handles PreparedStatement.setObject(pos, null))
-        *
-        * @param row The records to add to the output.
-        * @see PreparedStatement
-        * @throws IOException Thrown, if the records could not be added due to 
an I/O problem.
-        */
-       @Override
-       public void writeRecord(Row row) throws IOException {
-
-               if (typesArray != null && typesArray.length > 0 && 
typesArray.length != row.productArity()) {
-                       LOG.warn("Column SQL types array doesn't match arity of 
passed Row! Check the passed array...");
-               } 
-               try {
-
-                       if (typesArray == null ) {
-                               // no types provided
-                               for (int index = 0; index < row.productArity(); 
index++) {
-                                       LOG.warn("Unknown column type for 
column %s. Best effort approach to set its value: %s.", index + 1, 
row.productElement(index));
-                                       upload.setObject(index + 1, 
row.productElement(index));
-                               }
-                       } else {
-                               // types provided
-                               for (int index = 0; index < row.productArity(); 
index++) {
-
-                                       if (row.productElement(index) == null) {
-                                               upload.setNull(index + 1, 
typesArray[index]);
-                                       } else {
-                                               // casting values as suggested 
by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
-                                               switch (typesArray[index]) {
-                                                       case 
java.sql.Types.NULL:
-                                                               
upload.setNull(index + 1, typesArray[index]);
-                                                               break;
-                                                       case 
java.sql.Types.BOOLEAN:
-                                                       case java.sql.Types.BIT:
-                                                               
upload.setBoolean(index + 1, (boolean) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.CHAR:
-                                                       case 
java.sql.Types.NCHAR:
-                                                       case 
java.sql.Types.VARCHAR:
-                                                       case 
java.sql.Types.LONGVARCHAR:
-                                                       case 
java.sql.Types.LONGNVARCHAR:
-                                                               
upload.setString(index + 1, (String) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.TINYINT:
-                                                               
upload.setByte(index + 1, (byte) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.SMALLINT:
-                                                               
upload.setShort(index + 1, (short) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.INTEGER:
-                                                               
upload.setInt(index + 1, (int) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.BIGINT:
-                                                               
upload.setLong(index + 1, (long) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.REAL:
-                                                               
upload.setFloat(index + 1, (float) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.FLOAT:
-                                                       case 
java.sql.Types.DOUBLE:
-                                                               
upload.setDouble(index + 1, (double) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.DECIMAL:
-                                                       case 
java.sql.Types.NUMERIC:
-                                                               
upload.setBigDecimal(index + 1, (java.math.BigDecimal) 
row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.DATE:
-                                                               
upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.TIME:
-                                                               
upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.TIMESTAMP:
-                                                               
upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
-                                                               break;
-                                                       case 
java.sql.Types.BINARY:
-                                                       case 
java.sql.Types.VARBINARY:
-                                                       case 
java.sql.Types.LONGVARBINARY:
-                                                               
upload.setBytes(index + 1, (byte[]) row.productElement(index));
-                                                               break;
-                                                       default:
-                                                               
upload.setObject(index + 1, row.productElement(index));
-                                                               
LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set 
its value: %s.",
-                                                                       
typesArray[index], index + 1, row.productElement(index));
-                                                               // case 
java.sql.Types.SQLXML
-                                                               // case 
java.sql.Types.ARRAY:
-                                                               // case 
java.sql.Types.JAVA_OBJECT:
-                                                               // case 
java.sql.Types.BLOB:
-                                                               // case 
java.sql.Types.CLOB:
-                                                               // case 
java.sql.Types.NCLOB:
-                                                               // case 
java.sql.Types.DATALINK:
-                                                               // case 
java.sql.Types.DISTINCT:
-                                                               // case 
java.sql.Types.OTHER:
-                                                               // case 
java.sql.Types.REF:
-                                                               // case 
java.sql.Types.ROWID:
-                                                               // case 
java.sql.Types.STRUC
-                                               }
-                                       }
-                               }
-                       }
-                       upload.addBatch();
-                       batchCount++;
-                       if (batchCount >= batchInterval) {
-                               upload.executeBatch();
-                               batchCount = 0;
-                       }
-               } catch (SQLException | IllegalArgumentException e) {
-                       throw new IllegalArgumentException("writeRecord() 
failed", e);
-               }
-       }
-       
-       /**
-        * Executes prepared statement and closes all resources of this 
instance.
-        *
-        * @throws IOException Thrown, if the input could not be closed 
properly.
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       if (upload != null) {
-                               upload.executeBatch();
-                               upload.close();
-                       }
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } finally {
-                       upload = null;
-                       batchCount = 0;
-               }
-               
-               try {
-                       if (dbConn != null) {
-                               dbConn.close();
-                       }
-               } catch (SQLException se) {
-                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
-               } finally {
-                       dbConn = null;
-               }
-       }
-       
-       public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
-               return new JDBCOutputFormatBuilder();
-       }
-       
-       public static class JDBCOutputFormatBuilder {
-               private final JDBCOutputFormat format;
-               
-               protected JDBCOutputFormatBuilder() {
-                       this.format = new JDBCOutputFormat();
-               }
-               
-               public JDBCOutputFormatBuilder setUsername(String username) {
-                       format.username = username;
-                       return this;
-               }
-               
-               public JDBCOutputFormatBuilder setPassword(String password) {
-                       format.password = password;
-                       return this;
-               }
-               
-               public JDBCOutputFormatBuilder setDrivername(String drivername) 
{
-                       format.drivername = drivername;
-                       return this;
-               }
-               
-               public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
-                       format.dbURL = dbURL;
-                       return this;
-               }
-               
-               public JDBCOutputFormatBuilder setQuery(String query) {
-                       format.query = query;
-                       return this;
-               }
-               
-               public JDBCOutputFormatBuilder setBatchInterval(int 
batchInterval) {
-                       format.batchInterval = batchInterval;
-                       return this;
-               }
-               
-               public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
-                       format.typesArray = typesArray;
-                       return this;
-               }
-               
-               /**
-                * Finalizes the configuration and checks validity.
-                * 
-                * @return Configured JDBCOutputFormat
-                */
-               public JDBCOutputFormat finish() {
-                       if (format.username == null) {
-                               LOG.info("Username was not supplied 
separately.");
-                       }
-                       if (format.password == null) {
-                               LOG.info("Password was not supplied 
separately.");
-                       }
-                       if (format.dbURL == null) {
-                               throw new IllegalArgumentException("No dababase 
URL supplied.");
-                       }
-                       if (format.query == null) {
-                               throw new IllegalArgumentException("No query 
suplied");
-                       }
-                       if (format.drivername == null) {
-                               throw new IllegalArgumentException("No driver 
supplied");
-                       }
-                       
-                       return format;
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
deleted file mode 100644
index 2ed2f8c..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-
-/** 
- * 
- * This splits generator actually does nothing but wrapping the query 
parameters
- * computed by the user before creating the {@link JDBCInputFormat} instance.
- * 
- * */
-public class GenericParameterValuesProvider implements ParameterValuesProvider 
{
-
-       private final Serializable[][] parameters;
-       
-       public GenericParameterValuesProvider(Serializable[][] parameters) {
-               this.parameters = parameters;
-       }
-
-       @Override
-       public Serializable[][] getParameterValues(){
-               //do nothing...precomputed externally
-               return parameters;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
deleted file mode 100644
index ac56b98..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-/** 
- * 
- * This query generator assumes that the query to parameterize contains a 
BETWEEN constraint on a numeric column.
- * The generated query set will be of size equal to the configured fetchSize 
(apart the last one range),
- * ranging from the min value up to the max.
- * 
- * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK 
<CODE>id</CODE>, using a query like:
- * <PRE>
- *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
- * </PRE>
- *
- * you can use this class to automatically generate the parameters of the 
BETWEEN clause,
- * based on the passed constructor parameters.
- * 
- * */
-public class NumericBetweenParametersProvider implements 
ParameterValuesProvider {
-
-       private long fetchSize;
-       private final long min;
-       private final long max;
-       
-       public NumericBetweenParametersProvider(long fetchSize, long min, long 
max) {
-               this.fetchSize = fetchSize;
-               this.min = min;
-               this.max = max;
-       }
-
-       @Override
-       public Serializable[][] getParameterValues(){
-               double maxElemCount = (max - min) + 1;
-               int size = new Double(Math.ceil(maxElemCount / 
fetchSize)).intValue();
-               Serializable[][] parameters = new Serializable[size][2];
-               int count = 0;
-               for (long i = min; i < max; i += fetchSize, count++) {
-                       long currentLimit = i + fetchSize - 1;
-                       parameters[count] = new Long[]{i,currentLimit};
-                       if (currentLimit + 1 + fetchSize > max) {
-                               parameters[count + 1] = new Long[]{currentLimit 
+ 1, max};
-                               break;
-                       }
-               }
-               return parameters;
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
deleted file mode 100644
index c194497..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc.split;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-
-/**
- * 
- * This interface is used by the {@link JDBCInputFormat} to compute the list 
of parallel query to run (i.e. splits).
- * Each query will be parameterized using a row of the matrix provided by each 
{@link ParameterValuesProvider} implementation
- * 
- * */
-public interface ParameterValuesProvider {
-
-       /** Returns the necessary parameters array to use for query in parallel 
a table */
-       public Serializable[][] getParameterValues();
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
deleted file mode 100644
index da9469b..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import 
org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
-import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.table.Row;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCFullTest extends JDBCTestBase {
-
-       @Test
-       public void testJdbcInOut() throws Exception {
-               //run without parallelism
-               runTest(false);
-
-               //cleanup
-               JDBCTestBase.tearDownClass();
-               JDBCTestBase.prepareTestDb();
-               
-               //run expliting parallelism
-               runTest(true);
-               
-       }
-
-       private void runTest(boolean exploitParallelism) {
-               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
-               JDBCInputFormatBuilder inputBuilder = 
JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(JDBCTestBase.DRIVER_CLASS)
-                               .setDBUrl(JDBCTestBase.DB_URL)
-                               .setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
-                               .setRowTypeInfo(rowTypeInfo);
-
-               if(exploitParallelism) {
-                       final int fetchSize = 1;
-                       final Long min = new 
Long(JDBCTestBase.testData[0][0].toString());
-                       final Long max = new 
Long(JDBCTestBase.testData[JDBCTestBase.testData.length - 
fetchSize][0].toString());
-                       //use a "splittable" query to exploit parallelism
-                       inputBuilder = inputBuilder
-                                       
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
-                                       .setParametersProvider(new 
NumericBetweenParametersProvider(fetchSize, min, max));
-               }
-               DataSet<Row> source = 
environment.createInput(inputBuilder.finish());
-
-               //NOTE: in this case (with Derby driver) setSqlTypes could be 
skipped, but
-               //some database, doens't handle correctly null values when no 
column type specified
-               //in PreparedStatement.setObject (see its javadoc for more 
details)
-               source.output(JDBCOutputFormat.buildJDBCOutputFormat()
-                               .setDrivername(JDBCTestBase.DRIVER_CLASS)
-                               .setDBUrl(JDBCTestBase.DB_URL)
-                               .setQuery("insert into newbooks 
(id,title,author,price,qty) values (?,?,?,?,?)")
-                               .setSqlTypes(new int[]{Types.INTEGER, 
Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
-                               .finish());
-               try {
-                       environment.execute();
-               } catch (Exception e) {
-                       Assert.fail("JDBC full test failed. " + e.getMessage());
-               }
-
-               try (
-                       Connection dbConn = 
DriverManager.getConnection(JDBCTestBase.DB_URL);
-                       PreparedStatement statement = 
dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
-                       ResultSet resultSet = statement.executeQuery()
-               ) {
-                       int count = 0;
-                       while (resultSet.next()) {
-                               count++;
-                       }
-                       Assert.assertEquals(JDBCTestBase.testData.length, 
count);
-               } catch (SQLException e) {
-                       Assert.fail("JDBC full test failed. " + e.getMessage());
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index efae076..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.ResultSet;
-
-import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
-import 
org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
-import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.core.io.InputSplit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCInputFormatTest extends JDBCTestBase {
-
-       private JDBCInputFormat jdbcInputFormat;
-
-       @After
-       public void tearDown() throws IOException {
-               if (jdbcInputFormat != null) {
-                       jdbcInputFormat.close();
-               }
-               jdbcInputFormat = null;
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testUntypedRowInfo() throws IOException {
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               
.setDrivername("org.apache.derby.jdbc.idontexist")
-                               .setDBUrl(DB_URL)
-                               .setQuery(SELECT_ALL_BOOKS)
-                               .finish();
-               jdbcInputFormat.openInputFormat();
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidDriver() throws IOException {
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               
.setDrivername("org.apache.derby.jdbc.idontexist")
-                               .setDBUrl(DB_URL)
-                               .setQuery(SELECT_ALL_BOOKS)
-                               .setRowTypeInfo(rowTypeInfo)
-                               .finish();
-               jdbcInputFormat.openInputFormat();
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidURL() throws IOException {
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-                               .setQuery(SELECT_ALL_BOOKS)
-                               .setRowTypeInfo(rowTypeInfo)
-                               .finish();
-               jdbcInputFormat.openInputFormat();
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidQuery() throws IOException {
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               .setQuery("iamnotsql")
-                               .setRowTypeInfo(rowTypeInfo)
-                               .finish();
-               jdbcInputFormat.openInputFormat();
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testIncompleteConfiguration() throws IOException {
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setQuery(SELECT_ALL_BOOKS)
-                               .setRowTypeInfo(rowTypeInfo)
-                               .finish();
-       }
-
-       @Test
-       public void testJDBCInputFormatWithoutParallelism() throws IOException, 
InstantiationException, IllegalAccessException {
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               .setQuery(SELECT_ALL_BOOKS)
-                               .setRowTypeInfo(rowTypeInfo)
-                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-                               .finish();
-               //this query does not exploit parallelism
-               Assert.assertEquals(1, 
jdbcInputFormat.createInputSplits(1).length);
-               jdbcInputFormat.openInputFormat();
-               jdbcInputFormat.open(null);
-               Row row =  new Row(5);
-               int recordCount = 0;
-               while (!jdbcInputFormat.reachedEnd()) {
-                       Row next = jdbcInputFormat.nextRecord(row);
-                       if (next == null) {
-                               break;
-                       }
-                       
-                       if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
-                       if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
-                       if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
-                       if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
-                       if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
-
-                       for (int x = 0; x < 5; x++) {
-                               if(testData[recordCount][x]!=null) {
-                                       
Assert.assertEquals(testData[recordCount][x], next.productElement(x));
-                               }
-                       }
-                       recordCount++;
-               }
-               jdbcInputFormat.close();
-               jdbcInputFormat.closeInputFormat();
-               Assert.assertEquals(testData.length, recordCount);
-       }
-       
-       @Test
-       public void 
testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws 
IOException, InstantiationException, IllegalAccessException {
-               final int fetchSize = 1;
-               final Long min = new Long(JDBCTestBase.testData[0][0] + "");
-               final Long max = new 
Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
-               ParameterValuesProvider pramProvider = new 
NumericBetweenParametersProvider(fetchSize, min, max);
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
-                               .setRowTypeInfo(rowTypeInfo)
-                               .setParametersProvider(pramProvider)
-                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-                               .finish();
-
-               jdbcInputFormat.openInputFormat();
-               InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
-               //this query exploit parallelism (1 split for every id)
-               Assert.assertEquals(testData.length, splits.length);
-               int recordCount = 0;
-               Row row =  new Row(5);
-               for (int i = 0; i < splits.length; i++) {
-                       jdbcInputFormat.open(splits[i]);
-                       while (!jdbcInputFormat.reachedEnd()) {
-                               Row next = jdbcInputFormat.nextRecord(row);
-                               if (next == null) {
-                                       break;
-                               }
-                               if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
-                               if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
-                               if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
-                               if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
-                               if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
-
-                               for (int x = 0; x < 5; x++) {
-                                       if(testData[recordCount][x]!=null) {
-                                               
Assert.assertEquals(testData[recordCount][x], next.productElement(x));
-                                       }
-                               }
-                               recordCount++;
-                       }
-                       jdbcInputFormat.close();
-               }
-               jdbcInputFormat.closeInputFormat();
-               Assert.assertEquals(testData.length, recordCount);
-       }
-       
-       @Test
-       public void testJDBCInputFormatWithParallelismAndGenericSplitting() 
throws IOException, InstantiationException, IllegalAccessException {
-               Serializable[][] queryParameters = new String[2][1];
-               queryParameters[0] = new String[]{"Kumar"};
-               queryParameters[1] = new String[]{"Tan Ah Teck"};
-               ParameterValuesProvider paramProvider = new 
GenericParameterValuesProvider(queryParameters);
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
-                               .setRowTypeInfo(rowTypeInfo)
-                               .setParametersProvider(paramProvider)
-                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-                               .finish();
-               jdbcInputFormat.openInputFormat();
-               InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
-               //this query exploit parallelism (1 split for every 
queryParameters row)
-               Assert.assertEquals(queryParameters.length, splits.length);
-               int recordCount = 0;
-               Row row =  new Row(5);
-               for (int i = 0; i < splits.length; i++) {
-                       jdbcInputFormat.open(splits[i]);
-                       while (!jdbcInputFormat.reachedEnd()) {
-                               Row next = jdbcInputFormat.nextRecord(row);
-                               if (next == null) {
-                                       break;
-                               }
-                               if(next.productElement(0)!=null) { 
Assert.assertEquals("Field 0 should be int", Integer.class, 
next.productElement(0).getClass());}
-                               if(next.productElement(1)!=null) { 
Assert.assertEquals("Field 1 should be String", String.class, 
next.productElement(1).getClass());}
-                               if(next.productElement(2)!=null) { 
Assert.assertEquals("Field 2 should be String", String.class, 
next.productElement(2).getClass());}
-                               if(next.productElement(3)!=null) { 
Assert.assertEquals("Field 3 should be float", Double.class, 
next.productElement(3).getClass());}
-                               if(next.productElement(4)!=null) { 
Assert.assertEquals("Field 4 should be int", Integer.class, 
next.productElement(4).getClass());}
-
-                               recordCount++;
-                       }
-                       jdbcInputFormat.close();
-               }
-               Assert.assertEquals(3, recordCount);
-               jdbcInputFormat.closeInputFormat();
-       }
-       
-       @Test
-       public void testEmptyResults() throws IOException, 
InstantiationException, IllegalAccessException {
-               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               .setQuery(SELECT_EMPTY)
-                               .setRowTypeInfo(rowTypeInfo)
-                               
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-                               .finish();
-               jdbcInputFormat.openInputFormat();
-               jdbcInputFormat.open(null);
-               Row row = new Row(5);
-               int recordsCnt = 0;
-               while (!jdbcInputFormat.reachedEnd()) {
-                       Assert.assertNull(jdbcInputFormat.nextRecord(row));
-                       recordsCnt++;
-               }
-               jdbcInputFormat.close();
-               jdbcInputFormat.closeInputFormat();
-               Assert.assertEquals(0, recordsCnt);
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index 086a84c..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest extends JDBCTestBase {
-
-       private JDBCOutputFormat jdbcOutputFormat;
-       private Tuple5<Integer, String, String, Double, String> tuple5 = new 
Tuple5<>();
-
-       @After
-       public void tearDown() throws IOException {
-               if (jdbcOutputFormat != null) {
-                       jdbcOutputFormat.close();
-               }
-               jdbcOutputFormat = null;
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidDriver() throws IOException {
-               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-                               
.setDrivername("org.apache.derby.jdbc.idontexist")
-                               .setDBUrl(DB_URL)
-                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
-                               .finish();
-               jdbcOutputFormat.open(0, 1);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidURL() throws IOException {
-               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
-                               .finish();
-               jdbcOutputFormat.open(0, 1);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidQuery() throws IOException {
-               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               .setQuery("iamnotsql")
-                               .finish();
-               jdbcOutputFormat.open(0, 1);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testIncompleteConfiguration() throws IOException {
-               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
-                               .finish();
-       }
-
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testIncompatibleTypes() throws IOException {
-               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               .setQuery(String.format(INSERT_TEMPLATE, 
INPUT_TABLE))
-                               .finish();
-               jdbcOutputFormat.open(0, 1);
-
-               tuple5.setField(4, 0);
-               tuple5.setField("hello", 1);
-               tuple5.setField("world", 2);
-               tuple5.setField(0.99, 3);
-               tuple5.setField("imthewrongtype", 4);
-
-               Row row = new Row(tuple5.getArity());
-               for (int i = 0; i < tuple5.getArity(); i++) {
-                       row.setField(i, tuple5.getField(i));
-               }
-               jdbcOutputFormat.writeRecord(row);
-               jdbcOutputFormat.close();
-       }
-
-       @Test
-       public void testJDBCOutputFormat() throws IOException, 
InstantiationException, IllegalAccessException {
-
-               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-                               .setDrivername(DRIVER_CLASS)
-                               .setDBUrl(DB_URL)
-                               .setQuery(String.format(INSERT_TEMPLATE, 
OUTPUT_TABLE))
-                               .finish();
-               jdbcOutputFormat.open(0, 1);
-
-               for (int i = 0; i < testData.length; i++) {
-                       Row row = new Row(testData[i].length);
-                       for (int j = 0; j < testData[i].length; j++) {
-                               row.setField(j, testData[i][j]);
-                       }
-                       jdbcOutputFormat.writeRecord(row);
-               }
-
-               jdbcOutputFormat.close();
-
-               try (
-                       Connection dbConn = 
DriverManager.getConnection(JDBCTestBase.DB_URL);
-                       PreparedStatement statement = 
dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
-                       ResultSet resultSet = statement.executeQuery()
-               ) {
-                       int recordCount = 0;
-                       while (resultSet.next()) {
-                               Row row = new Row(tuple5.getArity());
-                               for (int i = 0; i < tuple5.getArity(); i++) {
-                                       row.setField(i, resultSet.getObject(i + 
1));
-                               }
-                               if (row.productElement(0) != null) {
-                                       Assert.assertEquals("Field 0 should be 
int", Integer.class, row.productElement(0).getClass());
-                               }
-                               if (row.productElement(1) != null) {
-                                       Assert.assertEquals("Field 1 should be 
String", String.class, row.productElement(1).getClass());
-                               }
-                               if (row.productElement(2) != null) {
-                                       Assert.assertEquals("Field 2 should be 
String", String.class, row.productElement(2).getClass());
-                               }
-                               if (row.productElement(3) != null) {
-                                       Assert.assertEquals("Field 3 should be 
float", Double.class, row.productElement(3).getClass());
-                               }
-                               if (row.productElement(4) != null) {
-                                       Assert.assertEquals("Field 4 should be 
int", Integer.class, row.productElement(4).getClass());
-                               }
-
-                               for (int x = 0; x < tuple5.getArity(); x++) {
-                                       if 
(JDBCTestBase.testData[recordCount][x] != null) {
-                                               
Assert.assertEquals(JDBCTestBase.testData[recordCount][x], 
row.productElement(x));
-                                       }
-                               }
-
-                               recordCount++;
-                       }
-                       Assert.assertEquals(JDBCTestBase.testData.length, 
recordCount);
-               } catch (SQLException e) {
-                       Assert.fail("JDBC OutputFormat test failed. " + 
e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
deleted file mode 100644
index 69ad693..0000000
--- 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-/**
- * Base test class for JDBC Input and Output formats
- */
-public class JDBCTestBase {
-       
-       public static final String DRIVER_CLASS = 
"org.apache.derby.jdbc.EmbeddedDriver";
-       public static final String DB_URL = "jdbc:derby:memory:ebookshop";
-       public static final String INPUT_TABLE = "books";
-       public static final String OUTPUT_TABLE = "newbooks";
-       public static final String SELECT_ALL_BOOKS = "select * from " + 
INPUT_TABLE;
-       public static final String SELECT_ALL_NEWBOOKS = "select * from " + 
OUTPUT_TABLE;
-       public static final String SELECT_EMPTY = "select * from books WHERE 
QTY < 0";
-       public static final String INSERT_TEMPLATE = "insert into %s (id, 
title, author, price, qty) values (?,?,?,?,?)";
-       public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
-       public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = 
JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
-       
-       protected static Connection conn;
-
-       public static final Object[][] testData = {
-                       {1001, ("Java public for dummies"), ("Tan Ah Teck"), 
11.11, 11},
-                       {1002, ("More Java for dummies"), ("Tan Ah Teck"), 
22.22, 22},
-                       {1003, ("More Java for more dummies"), ("Mohammad 
Ali"), 33.33, 33},
-                       {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-                       {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 
55},
-                       {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 
66.66, 66},
-                       {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 
77.77, 77},
-                       {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 
88.88, 88},
-                       {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 
99.99, 99},
-                       {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), 
null, 1010}};
-
-       public static final TypeInformation<?>[] fieldTypes = new 
TypeInformation<?>[] {
-               BasicTypeInfo.INT_TYPE_INFO,
-               BasicTypeInfo.STRING_TYPE_INFO,
-               BasicTypeInfo.STRING_TYPE_INFO,
-               BasicTypeInfo.DOUBLE_TYPE_INFO,
-               BasicTypeInfo.INT_TYPE_INFO
-       };
-       
-       public static final RowTypeInfo rowTypeInfo = new 
RowTypeInfo(fieldTypes);
-
-       public static String getCreateQuery(String tableName) {
-               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
");
-               sqlQueryBuilder.append(tableName).append(" (");
-               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-               sqlQueryBuilder.append("PRIMARY KEY (id))");
-               return sqlQueryBuilder.toString();
-       }
-       
-       public static String getInsertQuery() {
-               StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
-               for (int i = 0; i < JDBCTestBase.testData.length; i++) {
-                       sqlQueryBuilder.append("(")
-                       .append(JDBCTestBase.testData[i][0]).append(",'")
-                       .append(JDBCTestBase.testData[i][1]).append("','")
-                       .append(JDBCTestBase.testData[i][2]).append("',")
-                       .append(JDBCTestBase.testData[i][3]).append(",")
-                       .append(JDBCTestBase.testData[i][4]).append(")");
-                       if (i < JDBCTestBase.testData.length - 1) {
-                               sqlQueryBuilder.append(",");
-                       }
-               }
-               String insertQuery = sqlQueryBuilder.toString();
-               return insertQuery;
-       }
-       
-       public static final OutputStream DEV_NULL = new OutputStream() {
-               @Override
-               public void write(int b) {
-               }
-       };
-
-       public static void prepareTestDb() throws Exception {
-               System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-               Class.forName(DRIVER_CLASS);
-               Connection conn = DriverManager.getConnection(DB_URL + 
";create=true");
-
-               //create input table
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(getCreateQuery(INPUT_TABLE));
-               stat.close();
-
-               //create output table
-               stat = conn.createStatement();
-               stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
-               stat.close();
-
-               //prepare input data
-               stat = conn.createStatement();
-               stat.execute(JDBCTestBase.getInsertQuery());
-               stat.close();
-
-               conn.close();
-       }
-
-       @BeforeClass
-       public static void setUpClass() throws SQLException {
-               try {
-                       System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-                       prepareDerbyDatabase();
-               } catch (ClassNotFoundException e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
-
-       private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
-               Class.forName(DRIVER_CLASS);
-               conn = DriverManager.getConnection(DB_URL + ";create=true");
-               createTable(INPUT_TABLE);
-               createTable(OUTPUT_TABLE);
-               insertDataIntoInputTable();
-               conn.close();
-       }
-       
-       private static void createTable(String tableName) throws SQLException {
-               Statement stat = conn.createStatement();
-               stat.executeUpdate(getCreateQuery(tableName));
-               stat.close();
-       }
-       
-       private static void insertDataIntoInputTable() throws SQLException {
-               Statement stat = conn.createStatement();
-               stat.execute(JDBCTestBase.getInsertQuery());
-               stat.close();
-       }
-
-       @AfterClass
-       public static void tearDownClass() {
-               cleanUpDerbyDatabases();
-       }
-
-       private static void cleanUpDerbyDatabases() {
-               try {
-                       Class.forName(DRIVER_CLASS);
-                       conn = DriverManager.getConnection(DB_URL + 
";create=true");
-                       Statement stat = conn.createStatement();
-                       stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
-                       stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
-                       stat.close();
-                       conn.close();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties 
b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
deleted file mode 100644
index 2fb9345..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml 
b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/pom.xml b/flink-batch-connectors/pom.xml
deleted file mode 100644
index d4f65b3..0000000
--- a/flink-batch-connectors/pom.xml
+++ /dev/null
@@ -1,45 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-    <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-parent</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-
-       <artifactId>flink-batch-connectors</artifactId>
-       <name>flink-batch-connectors</name>
-       <packaging>pom</packaging>
-
-       <modules>
-               <module>flink-avro</module>
-               <module>flink-jdbc</module>
-               <module>flink-hadoop-compatibility</module>
-               <module>flink-hbase</module>
-               <module>flink-hcatalog</module>
-       </modules>
-       
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml 
b/flink-connectors/flink-avro/pom.xml
new file mode 100644
index 0000000..cdd7c78
--- /dev/null
+++ b/flink-connectors/flink-avro/pom.xml
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-avro_2.10</artifactId>
+       <name>flink-avro</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <!-- core dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+                       <!-- version is derived from base module -->
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>create-test-dependency</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <archive>
+                                                               <manifest>
+                                                                       
<mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
+                                                               </manifest>
+                                                       </archive>
+                                                       
<finalName>maven</finalName>
+                                                       <attach>false</attach>
+                                                       <descriptors>
+                                                               
<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+                                                       </descriptors>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <!--Remove the AvroExternalJarProgram code from the 
test-classes directory since it musn't be in the
+                       classpath when running the tests to actually test 
whether the user code class loader
+                       is properly used.-->
+                       <plugin>
+                               <artifactId>maven-clean-plugin</artifactId>
+                               <version>2.5</version><!--$NO-MVN-MAN-VER$-->
+                               <executions>
+                                       <execution>
+                                               
<id>remove-avroexternalprogram</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>clean</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<excludeDefaultDirectories>true</excludeDefaultDirectories>
+                                                       <filesets>
+                                                               <fileset>
+                                                                       
<directory>${project.build.testOutputDirectory}</directory>
+                                                                       
<includes>
+                                                                               
<include>**/testjar/*.class</include>
+                                                                       
</includes>
+                                                               </fileset>
+                                                       </filesets>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <!-- Generate Test class from avro schema -->
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>1.7.7</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+                                                       
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+               
+               <pluginManagement>
+                       <plugins>
+                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
+                               <plugin>
+                                       <groupId>org.eclipse.m2e</groupId>
+                                       
<artifactId>lifecycle-mapping</artifactId>
+                                       <version>1.0.0</version>
+                                       <configuration>
+                                               <lifecycleMappingMetadata>
+                                                       <pluginExecutions>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.maven.plugins</groupId>
+                                                                               
<artifactId>maven-assembly-plugin</artifactId>
+                                                                               
<versionRange>[2.4,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>single</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.maven.plugins</groupId>
+                                                                               
<artifactId>maven-clean-plugin</artifactId>
+                                                                               
<versionRange>[1,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>clean</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>org.apache.avro</groupId>
+                                                                               
<artifactId>avro-maven-plugin</artifactId>
+                                                                               
<versionRange>[1.7.7,)</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>schema</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore/>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                       </pluginExecutions>
+                                               </lifecycleMappingMetadata>
+                                       </configuration>
+                               </plugin>
+                       </plugins>
+               </pluginManagement>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
 
b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
new file mode 100644
index 0000000..59da4cb
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+
+public class DataInputDecoder extends Decoder {
+       
+       private final Utf8 stringDecoder = new Utf8();
+       
+       private DataInput in;
+       
+       public void setIn(DataInput in) {
+               this.in = in;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // primitives
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void readNull() {}
+       
+
+       @Override
+       public boolean readBoolean() throws IOException {
+               return in.readBoolean();
+       }
+
+       @Override
+       public int readInt() throws IOException {
+               return in.readInt();
+       }
+
+       @Override
+       public long readLong() throws IOException {
+               return in.readLong();
+       }
+
+       @Override
+       public float readFloat() throws IOException {
+               return in.readFloat();
+       }
+
+       @Override
+       public double readDouble() throws IOException {
+               return in.readDouble();
+       }
+       
+       @Override
+       public int readEnum() throws IOException {
+               return readInt();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // bytes
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void readFixed(byte[] bytes, int start, int length) throws 
IOException {
+               in.readFully(bytes, start, length);
+       }
+       
+       @Override
+       public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+               int length = readInt();
+               ByteBuffer result;
+               if (old != null && length <= old.capacity() && old.hasArray()) {
+                       result = old;
+                       result.clear();
+               } else {
+                       result = ByteBuffer.allocate(length);
+               }
+               in.readFully(result.array(), result.arrayOffset() + 
result.position(), length);
+               result.limit(length);
+               return result;
+       }
+       
+       
+       @Override
+       public void skipFixed(int length) throws IOException {
+               skipBytes(length);
+       }
+       
+       @Override
+       public void skipBytes() throws IOException {
+               int num = readInt();
+               skipBytes(num);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // strings
+       // 
--------------------------------------------------------------------------------------------
+       
+       
+       @Override
+       public Utf8 readString(Utf8 old) throws IOException {
+               int length = readInt();
+               Utf8 result = (old != null ? old : new Utf8());
+               result.setByteLength(length);
+               
+               if (length > 0) {
+                       in.readFully(result.getBytes(), 0, length);
+               }
+               
+               return result;
+       }
+
+       @Override
+       public String readString() throws IOException {
+               return readString(stringDecoder).toString();
+       }
+
+       @Override
+       public void skipString() throws IOException {
+               int len = readInt();
+               skipBytes(len);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // collection types
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public long readArrayStart() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long arrayNext() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long skipArray() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long readMapStart() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long mapNext() throws IOException {
+               return readVarLongCount(in);
+       }
+
+       @Override
+       public long skipMap() throws IOException {
+               return readVarLongCount(in);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // union
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int readIndex() throws IOException {
+               return readInt();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // utils
+       // 
--------------------------------------------------------------------------------------------
+       
+       private void skipBytes(int num) throws IOException {
+               while (num > 0) {
+                       num -= in.skipBytes(num);
+               }
+       }
+       
+       public static long readVarLongCount(DataInput in) throws IOException {
+               long value = in.readUnsignedByte();
+
+               if ((value & 0x80) == 0) {
+                       return value;
+               }
+               else {
+                       long curr;
+                       int shift = 7;
+                       value = value & 0x7f;
+                       while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+                               value |= (curr & 0x7f) << shift;
+                               shift += 7;
+                       }
+                       value |= curr << shift;
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
 
b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
new file mode 100644
index 0000000..0102cc1
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+
+public final class DataOutputEncoder extends Encoder implements 
java.io.Serializable {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private DataOutput out;
+       
+       
+       public void setOut(DataOutput out) {
+               this.out = out;
+       }
+
+
+       @Override
+       public void flush() throws IOException {}
+
+       // 
--------------------------------------------------------------------------------------------
+       // primitives
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void writeNull() {}
+       
+
+       @Override
+       public void writeBoolean(boolean b) throws IOException {
+               out.writeBoolean(b);
+       }
+
+       @Override
+       public void writeInt(int n) throws IOException {
+               out.writeInt(n);
+       }
+
+       @Override
+       public void writeLong(long n) throws IOException {
+               out.writeLong(n);
+       }
+
+       @Override
+       public void writeFloat(float f) throws IOException {
+               out.writeFloat(f);
+       }
+
+       @Override
+       public void writeDouble(double d) throws IOException {
+               out.writeDouble(d);
+       }
+       
+       @Override
+       public void writeEnum(int e) throws IOException {
+               out.writeInt(e);
+       }
+       
+       
+       // 
--------------------------------------------------------------------------------------------
+       // bytes
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeFixed(byte[] bytes, int start, int len) throws 
IOException {
+               out.write(bytes, start, len);
+       }
+       
+       @Override
+       public void writeBytes(byte[] bytes, int start, int len) throws 
IOException {
+               out.writeInt(len);
+               if (len > 0) {
+                       out.write(bytes, start, len);
+               }
+       }
+       
+       @Override
+       public void writeBytes(ByteBuffer bytes) throws IOException {
+               int num = bytes.remaining();
+               out.writeInt(num);
+               
+               if (num > 0) {
+                       writeFixed(bytes);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // strings
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeString(String str) throws IOException {
+               byte[] bytes = Utf8.getBytesFor(str);
+               writeBytes(bytes, 0, bytes.length);
+       }
+       
+       @Override
+       public void writeString(Utf8 utf8) throws IOException {
+               writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+               
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // collection types
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void writeArrayStart() {}
+
+       @Override
+       public void setItemCount(long itemCount) throws IOException {
+               if (itemCount > 0) {
+                       writeVarLongCount(out, itemCount);
+               }
+       }
+
+       @Override
+       public void startItem() {}
+
+       @Override
+       public void writeArrayEnd() throws IOException {
+               // write a single byte 0, shortcut for a var-length long of 0
+               out.write(0);
+       }
+
+       @Override
+       public void writeMapStart() {}
+
+       @Override
+       public void writeMapEnd() throws IOException {
+               // write a single byte 0, shortcut for a var-length long of 0
+               out.write(0);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // union
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void writeIndex(int unionIndex) throws IOException {
+               out.writeInt(unionIndex);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       // utils
+       // 
--------------------------------------------------------------------------------------------
+               
+       
+       public static void writeVarLongCount(DataOutput out, long val) throws 
IOException {
+               if (val < 0) {
+                       throw new IOException("Illegal count (must be 
non-negative): " + val);
+               }
+               
+               while ((val & ~0x7FL) != 0) {
+                       out.write(((int) val) | 0x80);
+                       val >>>= 7;
+               }
+               out.write((int) val);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
 
b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..709c4f1
--- /dev/null
+++ 
b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.avro.file.SeekableInput;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache 
licensed as well)
+ * 
+ * The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+       private final FSDataInputStream stream;
+       private long pos;
+       private long len;
+
+       public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+               this.stream = stream;
+               this.pos = 0;
+               this.len = len;
+       }
+
+       public long length() throws IOException {
+               return this.len;
+       }
+
+       public int read(byte[] b, int off, int len) throws IOException {
+               int read;
+               read = stream.read(b, off, len);
+               pos += read;
+               return read;
+       }
+
+       public void seek(long p) throws IOException {
+               stream.seek(p);
+               pos = p;
+       }
+
+       public long tell() throws IOException {
+               return pos;
+       }
+
+       public void close() throws IOException {
+               stream.close();
+       }
+}

Reply via email to