http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
new file mode 100644
index 0000000..3dddd88
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
+       private static final String TEST_TABLE_NAME = 
"TableInputFormatTestTable";
+       private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+       private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+       // These are the row ids AND also the values we will put in the test 
table
+       private static final String[] ROW_IDS = {"000", "111", "222", "333", 
"444", "555", "666", "777", "888", "999"};
+
+       @BeforeClass
+       public static void activateHBaseCluster(){
+               registerHBaseMiniClusterInClasspath();
+       }
+
+       @Before
+       public void createTestTable() throws IOException {
+               TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+               byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), 
"6".getBytes(), "9".getBytes()};
+               createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+               HTable table = openTable(tableName);
+
+               for (String rowId : ROW_IDS) {
+                       byte[] rowIdBytes = rowId.getBytes();
+                       Put p = new Put(rowIdBytes);
+                       // Use the rowId as the value to facilitate the testing 
better
+                       p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, 
rowIdBytes);
+                       table.put(p);
+               }
+
+               table.close();
+       }
+
+       class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
+               @Override
+               protected Scan getScanner() {
+                       return new Scan();
+               }
+
+               @Override
+               protected String getTableName() {
+                       return TEST_TABLE_NAME;
+               }
+
+               @Override
+               protected Tuple1<String> mapResultToTuple(Result r) {
+                       return new Tuple1<>(new 
String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+               }
+       }
+
+       @Test
+       public void testTableInputFormat() {
+               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+               environment.setParallelism(1);
+
+               DataSet<String> resultDataSet =
+                       environment.createInput(new 
InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
+                               @Override
+                               public String map(Tuple1<String> value) throws 
Exception {
+                                       return value.f0;
+                               }
+                       });
+
+               List<String> resultSet = new ArrayList<>();
+               resultDataSet.output(new 
LocalCollectionOutputFormat<>(resultSet));
+
+               try {
+                       environment.execute("HBase InputFormat Test");
+               } catch (Exception e) {
+                       Assert.fail("HBase InputFormat test failed. " + 
e.getMessage());
+               }
+
+               for (String rowId : ROW_IDS) {
+                       assertTrue("Missing rowId from table: " + rowId, 
resultSet.contains(rowId));
+               }
+
+               assertEquals("The number of records is wrong.", ROW_IDS.length, 
resultSet.size());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
new file mode 100644
index 0000000..8579dee
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+public class HBaseFlinkTestConstants {
+       
+       public static final byte[] CF_SOME = "someCf".getBytes();
+       public static final byte[] Q_SOME = "someQual".getBytes();
+       public static final String TEST_TABLE_NAME = "test-table";
+       public static final String TMP_DIR = "/tmp/test";
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
new file mode 100644
index 0000000..dccf876
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.addons.hbase.TableInputFormat;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Simple stub for HBase DataSet read
+ * 
+ * To run the test first create the test table with hbase shell.
+ * 
+ * Use the following commands:
+ * <ul>
+ *     <li>create 'test-table', 'someCf'</li>
+ *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
+ *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
+ * </ul>
+ * 
+ * The test should return just the first entry.
+ * 
+ */
+public class HBaseReadExample {
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               @SuppressWarnings("serial")
+               DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new 
TableInputFormat<Tuple2<String, String>>() {
+                       
+                               @Override
+                               public String getTableName() {
+                                       return 
HBaseFlinkTestConstants.TEST_TABLE_NAME;
+                               }
+
+                               @Override
+                               protected Scan getScanner() {
+                                       Scan scan = new Scan();
+                                       
scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
+                                       return scan;
+                               }
+
+                               private Tuple2<String, String> reuse = new 
Tuple2<String, String>();
+                               
+                               @Override
+                               protected Tuple2<String, String> 
mapResultToTuple(Result r) {
+                                       String key = Bytes.toString(r.getRow());
+                                       String val = 
Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, 
HBaseFlinkTestConstants.Q_SOME));
+                                       reuse.setField(key, 0);
+                                       reuse.setField(val, 1);
+                                       return reuse;
+                               }
+               })
+               .filter(new FilterFunction<Tuple2<String,String>>() {
+
+                       @Override
+                       public boolean filter(Tuple2<String, String> t) throws 
Exception {
+                               String val = t.getField(1);
+                               if(val.startsWith("someStr"))
+                                       return true;
+                               return false;
+                       }
+               });
+               
+               hbaseDs.print();
+               
+               // kick off execution.
+               env.execute();
+                               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
new file mode 100644
index 0000000..483bdff
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Simple stub for HBase DataSet write
+ * 
+ * To run the test first create the test table with hbase shell.
+ * 
+ * Use the following commands:
+ * <ul>
+ *     <li>create 'test-table', 'someCf'</li>
+ * </ul>
+ * 
+ */
+@SuppressWarnings("serial")
+public class HBaseWriteExample {
+       
+       // 
*************************************************************************
+       //     PROGRAM
+       // 
*************************************************************************
+       
+       public static void main(String[] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+               
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // get input data
+               DataSet<String> text = getTextDataSet(env);
+               
+               DataSet<Tuple2<String, Integer>> counts = 
+                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
+                               text.flatMap(new Tokenizer())
+                               // group by the tuple field "0" and sum up 
tuple field "1"
+                               .groupBy(0)
+                               .sum(1);
+
+               // emit result
+               Job job = Job.getInstance();
+               job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
outputTableName);
+               // TODO is "mapred.output.dir" really useful?
+               
job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
+               counts.map(new RichMapFunction <Tuple2<String,Integer>, 
Tuple2<Text,Mutation>>() {
+                       private transient Tuple2<Text, Mutation> reuse;
+
+                       @Override
+                       public void open(Configuration parameters) throws 
Exception {
+                               super.open(parameters);
+                               reuse = new Tuple2<Text, Mutation>();
+                       }
+
+                       @Override
+                       public Tuple2<Text, Mutation> map(Tuple2<String, 
Integer> t) throws Exception {
+                               reuse.f0 = new Text(t.f0);
+                               Put put = new Put(t.f0.getBytes());
+                               
put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, 
Bytes.toBytes(t.f1));
+                               reuse.f1 = put;
+                               return reuse;
+                       }
+               }).output(new HadoopOutputFormat<Text, Mutation>(new 
TableOutputFormat<Text>(), job));
+               
+               // execute program
+               env.execute("WordCount (HBase sink) Example");
+       }
+       
+       // 
*************************************************************************
+       //     USER FUNCTIONS
+       // 
*************************************************************************
+       
+       /**
+        * Implements the string tokenizer that splits sentences into words as 
a user-defined
+        * FlatMapFunction. The function takes a line (String) and splits it 
into 
+        * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+        */
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+                       
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+       
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+       private static boolean fileOutput = false;
+       private static String textPath;
+       private static String outputTableName = 
HBaseFlinkTestConstants.TEST_TABLE_NAME;
+       
+       private static boolean parseParameters(String[] args) {
+               
+               if(args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if(args.length == 2) {
+                               textPath = args[0];
+                               outputTableName = args[1];
+                       } else {
+                               System.err.println("Usage: HBaseWriteExample 
<text path> <output table>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing HBaseWriteExample example 
with built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from a file.");
+                       System.out.println("  Usage: HBaseWriteExample <text 
path> <output table>");
+               }
+               return true;
+       }
+       
+       private static DataSet<String> getTextDataSet(ExecutionEnvironment env) 
{
+               if(fileOutput) {
+                       // read the text file from given input path
+                       return env.readTextFile(textPath);
+               } else {
+                       // get default test text data
+                       return getDefaultTextLineDataSet(env);
+               }
+       }
+       private static DataSet<String> 
getDefaultTextLineDataSet(ExecutionEnvironment env) {
+               return env.fromElements(WORDS);
+       }
+       private static final String[] WORDS = new String[] {
+               "To be, or not to be,--that is the question:--",
+               "Whether 'tis nobler in the mind to suffer",
+               "The slings and arrows of outrageous fortune",
+               "Or to take arms against a sea of troubles,",
+               "And by opposing end them?--To die,--to sleep,--",
+               "No more; and by a sleep to say we end",
+               "The heartache, and the thousand natural shocks",
+               "That flesh is heir to,--'tis a consummation",
+               "Devoutly to be wish'd. To die,--to sleep;--",
+               "To sleep! perchance to dream:--ay, there's the rub;",
+               "For in that sleep of death what dreams may come,",
+               "When we have shuffled off this mortal coil,",
+               "Must give us pause: there's the respect",
+               "That makes calamity of so long life;",
+               "For who would bear the whips and scorns of time,",
+               "The oppressor's wrong, the proud man's contumely,",
+               "The pangs of despis'd love, the law's delay,",
+               "The insolence of office, and the spurns",
+               "That patient merit of the unworthy takes,",
+               "When he himself might his quietus make",
+               "With a bare bodkin? who would these fardels bear,",
+               "To grunt and sweat under a weary life,",
+               "But that the dread of something after death,--",
+               "The undiscover'd country, from whose bourn",
+               "No traveller returns,--puzzles the will,",
+               "And makes us rather bear those ills we have",
+               "Than fly to others that we know not of?",
+               "Thus conscience does make cowards of us all;",
+               "And thus the native hue of resolution",
+               "Is sicklied o'er with the pale cast of thought;",
+               "And enterprises of great pith and moment,",
+               "With this regard, their currents turn awry,",
+               "And lose the name of action.--Soft you now!",
+               "The fair Ophelia!--Nymph, in thy orisons",
+               "Be all my sins remember'd."
+       };
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
new file mode 100644
index 0000000..05398db
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.example;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * 
+ * This is an example how to write streams into HBase. In this example the
+ * stream will be written into a local Hbase but it is possible to adapt this
+ * example for an HBase running in a cloud. You need a running local HBase 
with a
+ * table "flinkExample" and a column "entry". If your HBase configuration does
+ * not fit the hbase-site.xml in the resource folder then you gave to delete 
temporary this
+ * hbase-site.xml to execute the example properly.
+ * 
+ */
+public class HBaseWriteStreamExample {
+
+       public static void main(String[] args) throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment
+                               .getExecutionEnvironment();
+
+               // data stream with random numbers
+               DataStream<String> dataStream = env.addSource(new 
SourceFunction<String>() {
+                       private static final long serialVersionUID = 1L;
+
+                       private volatile boolean isRunning = true;
+
+                       @Override
+                       public void run(SourceContext<String> out) throws 
Exception {
+                               while (isRunning) {
+                                       
out.collect(String.valueOf(Math.floor(Math.random() * 100)));
+                               }
+
+                       }
+
+                       @Override
+                       public void cancel() {
+                               isRunning = false;
+                       }
+               });
+               dataStream.writeUsingOutputFormat(new HBaseOutputFormat());
+
+               env.execute();
+       }
+
+       /**
+        * 
+        * This class implements an OutputFormat for HBase
+        *
+        */
+       private static class HBaseOutputFormat implements OutputFormat<String> {
+
+               private org.apache.hadoop.conf.Configuration conf = null;
+               private HTable table = null;
+               private String taskNumber = null;
+               private int rowNumber = 0;
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void configure(Configuration parameters) {
+                       conf = HBaseConfiguration.create();
+               }
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {
+                       table = new HTable(conf, "flinkExample");
+                       this.taskNumber = String.valueOf(taskNumber);
+               }
+
+               @Override
+               public void writeRecord(String record) throws IOException {
+                       Put put = new Put(Bytes.toBytes(taskNumber + 
rowNumber));
+                       put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
+                                       Bytes.toBytes(rowNumber));
+                       rowNumber++;
+                       table.put(put);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       table.flushCommits();
+                       table.close();
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties 
b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..804ff45
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=DEBUG, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.threshold=INFO
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - 
%m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hcatalog/pom.xml 
b/flink-connectors/flink-hcatalog/pom.xml
new file mode 100644
index 0000000..dde7996
--- /dev/null
+++ b/flink-connectors/flink-hcatalog/pom.xml
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+       
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-hcatalog</artifactId>
+       <name>flink-hcatalog</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-hadoop-compatibility_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hive.hcatalog</groupId>
+                       <artifactId>hcatalog-core</artifactId>
+                       <version>0.12.0</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.json</groupId>
+                                       <artifactId>json</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-library</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Scala Code Style, most of the configuration done 
via plugin management -->
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <configuration>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                               </configuration>
+                       </plugin>
+
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
 
b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
new file mode 100644
index 0000000..859b706
--- /dev/null
+++ 
b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog;
+
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and 
partition filters.
+ *
+ * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or 
Flink-native tuple.
+ *
+ * Note: Flink tuples might only support a limited number of fields (depending 
on the API).
+ *
+ * @param <T>
+ */
+public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, 
HadoopInputSplit> implements ResultTypeQueryable<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       private Configuration configuration;
+
+       private org.apache.hive.hcatalog.mapreduce.HCatInputFormat 
hCatInputFormat;
+       private RecordReader<WritableComparable, HCatRecord> recordReader;
+       private boolean fetched = false;
+       private boolean hasNext;
+
+       protected String[] fieldNames = new String[0];
+       protected HCatSchema outputSchema;
+
+       private TypeInformation<T> resultType;
+
+       public HCatInputFormatBase() { }
+
+       /**
+        * Creates a HCatInputFormat for the given database and table.
+        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
+        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
+        * {@link HCatInputFormatBase#asFlinkTuples()}.
+        *
+        * @param database The name of the database to read from.
+        * @param table The name of the table to read.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase(String database, String table) throws 
IOException {
+               this(database, table, new Configuration());
+       }
+
+       /**
+        * Creates a HCatInputFormat for the given database, table, and
+        * {@link org.apache.hadoop.conf.Configuration}.
+        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
+        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
+        * {@link HCatInputFormatBase#asFlinkTuples()}.
+        *
+        * @param database The name of the database to read from.
+        * @param table The name of the table to read.
+        * @param config The Configuration for the InputFormat.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase(String database, String table, Configuration 
config) throws IOException {
+               super();
+               this.configuration = config;
+               HadoopUtils.mergeHadoopConf(this.configuration);
+
+               this.hCatInputFormat = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, 
database, table);
+               this.outputSchema = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
+
+               // configure output schema of HCatFormat
+               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
+               // set type information
+               this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
+       }
+
+       /**
+        * Specifies the fields which are returned by the InputFormat and their 
order.
+        *
+        * @param fields The fields and their order which are returned by the 
InputFormat.
+        * @return This InputFormat with specified return fields.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase<T> getFields(String... fields) throws 
IOException {
+
+               // build output schema
+               ArrayList<HCatFieldSchema> fieldSchemas = new 
ArrayList<HCatFieldSchema>(fields.length);
+               for(String field : fields) {
+                       fieldSchemas.add(this.outputSchema.get(field));
+               }
+               this.outputSchema = new HCatSchema(fieldSchemas);
+
+               // update output schema configuration
+               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
+
+               return this;
+       }
+
+       /**
+        * Specifies a SQL-like filter condition on the table's partition 
columns.
+        * Filter conditions on non-partition columns are invalid.
+        * A partition filter can significantly reduce the amount of data to be 
read.
+        *
+        * @param filter A SQL-like filter condition on the table's partition 
columns.
+        * @return This InputFormat with specified partition filter.
+        * @throws java.io.IOException
+        */
+       public HCatInputFormatBase<T> withFilter(String filter) throws 
IOException {
+
+               // set filter
+               this.hCatInputFormat.setFilter(filter);
+
+               return this;
+       }
+
+       /**
+        * Specifies that the InputFormat returns Flink tuples instead of
+        * {@link org.apache.hive.hcatalog.data.HCatRecord}.
+        *
+        * Note: Flink tuples might only support a limited number of fields 
(depending on the API).
+        *
+        * @return This InputFormat.
+        * @throws org.apache.hive.hcatalog.common.HCatException
+        */
+       public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
+
+               // build type information
+               int numFields = outputSchema.getFields().size();
+               if(numFields > this.getMaxFlinkTupleSize()) {
+                       throw new IllegalArgumentException("Only up to 
"+this.getMaxFlinkTupleSize()+
+                                       " fields can be returned as Flink 
tuples.");
+               }
+
+               TypeInformation[] fieldTypes = new TypeInformation[numFields];
+               fieldNames = new String[numFields];
+               for (String fieldName : outputSchema.getFieldNames()) {
+                       HCatFieldSchema field = outputSchema.get(fieldName);
+
+                       int fieldPos = outputSchema.getPosition(fieldName);
+                       TypeInformation fieldType = getFieldType(field);
+
+                       fieldTypes[fieldPos] = fieldType;
+                       fieldNames[fieldPos] = fieldName;
+
+               }
+               this.resultType = new TupleTypeInfo(fieldTypes);
+
+               return this;
+       }
+
+       protected abstract int getMaxFlinkTupleSize();
+
+       private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
+
+               switch(fieldSchema.getType()) {
+                       case INT:
+                               return BasicTypeInfo.INT_TYPE_INFO;
+                       case TINYINT:
+                               return BasicTypeInfo.BYTE_TYPE_INFO;
+                       case SMALLINT:
+                               return BasicTypeInfo.SHORT_TYPE_INFO;
+                       case BIGINT:
+                               return BasicTypeInfo.LONG_TYPE_INFO;
+                       case BOOLEAN:
+                               return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+                       case FLOAT:
+                               return BasicTypeInfo.FLOAT_TYPE_INFO;
+                       case DOUBLE:
+                               return BasicTypeInfo.DOUBLE_TYPE_INFO;
+                       case STRING:
+                               return BasicTypeInfo.STRING_TYPE_INFO;
+                       case BINARY:
+                               return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+                       case ARRAY:
+                               return new GenericTypeInfo(List.class);
+                       case MAP:
+                               return new GenericTypeInfo(Map.class);
+                       case STRUCT:
+                               return new GenericTypeInfo(List.class);
+                       default:
+                               throw new IllegalArgumentException("Unknown 
data type \""+fieldSchema.getType()+"\" encountered.");
+               }
+       }
+
+       /**
+        * Returns the {@link org.apache.hadoop.conf.Configuration} of the 
HCatInputFormat.
+        *
+        * @return The Configuration of the HCatInputFormat.
+        */
+       public Configuration getConfiguration() {
+               return this.configuration;
+       }
+
+       /**
+        * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} 
of the {@link org.apache.hive.hcatalog.data.HCatRecord}
+        * returned by this InputFormat.
+        *
+        * @return The HCatSchema of the HCatRecords returned by this 
InputFormat.
+        */
+       public HCatSchema getOutputSchema() {
+               return this.outputSchema;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  InputFormat
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void configure(org.apache.flink.configuration.Configuration 
parameters) {
+               // nothing to do
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
+               // no statistics provided at the moment
+               return null;
+       }
+
+       @Override
+       public HadoopInputSplit[] createInputSplits(int minNumSplits)
+                       throws IOException {
+               
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", 
minNumSplits);
+
+               JobContext jobContext = null;
+               try {
+                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, new JobID());
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               List<InputSplit> splits;
+               try {
+                       splits = this.hCatInputFormat.getSplits(jobContext);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not get Splits.", e);
+               }
+               HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
+
+               for(int i = 0; i < hadoopInputSplits.length; i++){
+                       hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
+               }
+               return hadoopInputSplits;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
+               return new LocatableInputSplitAssigner(inputSplits);
+       }
+
+       @Override
+       public void open(HadoopInputSplit split) throws IOException {
+               TaskAttemptContext context = null;
+               try {
+                       context = 
HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+               } catch(Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               try {
+                       this.recordReader = this.hCatInputFormat
+                                       
.createRecordReader(split.getHadoopInputSplit(), context);
+                       
this.recordReader.initialize(split.getHadoopInputSplit(), context);
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not create RecordReader.", 
e);
+               } finally {
+                       this.fetched = false;
+               }
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               if(!this.fetched) {
+                       fetchNext();
+               }
+               return !this.hasNext;
+       }
+
+       private void fetchNext() throws IOException {
+               try {
+                       this.hasNext = this.recordReader.nextKeyValue();
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not fetch next KeyValue 
pair.", e);
+               } finally {
+                       this.fetched = true;
+               }
+       }
+
+       @Override
+       public T nextRecord(T record) throws IOException {
+               if(!this.fetched) {
+                       // first record
+                       fetchNext();
+               }
+               if(!this.hasNext) {
+                       return null;
+               }
+               try {
+
+                       // get next HCatRecord
+                       HCatRecord v = this.recordReader.getCurrentValue();
+                       this.fetched = false;
+
+                       if(this.fieldNames.length > 0) {
+                               // return as Flink tuple
+                               return this.buildFlinkTuple(record, v);
+
+                       } else {
+                               // return as HCatRecord
+                               return (T)v;
+                       }
+
+               } catch (InterruptedException e) {
+                       throw new IOException("Could not get next record.", e);
+               }
+       }
+
+       protected abstract T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException;
+
+       @Override
+       public void close() throws IOException {
+               this.recordReader.close();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom de/serialization methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeInt(this.fieldNames.length);
+               for(String fieldName : this.fieldNames) {
+                       out.writeUTF(fieldName);
+               }
+               this.configuration.write(out);
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               this.fieldNames = new String[in.readInt()];
+               for(int i=0; i<this.fieldNames.length; i++) {
+                       this.fieldNames[i] = in.readUTF();
+               }
+
+               Configuration configuration = new Configuration();
+               configuration.readFields(in);
+
+               if(this.configuration == null) {
+                       this.configuration = configuration;
+               }
+
+               this.hCatInputFormat = new 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
+               this.outputSchema = 
(HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Result type business
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public TypeInformation<T> getProducedType() {
+               return this.resultType;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
 
b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
new file mode 100644
index 0000000..46f3cd5
--- /dev/null
+++ 
b/flink-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.java;
+
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.hcatalog.HCatInputFormatBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and 
partition filters.
+ *
+ * Data can be returned as {@link HCatRecord} or Flink {@link 
org.apache.flink.api.java.tuple.Tuple}.
+ * Flink tuples support only up to 25 fields.
+ *
+ * @param <T>
+ */
+public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
+       private static final long serialVersionUID = 1L;
+
+       public HCatInputFormat() {}
+
+       public HCatInputFormat(String database, String table) throws Exception {
+               super(database, table);
+       }
+
+       public HCatInputFormat(String database, String table, Configuration 
config) throws Exception {
+               super(database, table, config);
+       }
+
+
+       @Override
+       protected int getMaxFlinkTupleSize() {
+               return 25;
+       }
+
+       @Override
+       protected T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException {
+
+               Tuple tuple = (Tuple)t;
+
+               // Extract all fields from HCatRecord
+               for(int i=0; i < this.fieldNames.length; i++) {
+
+                       // get field value
+                       Object o = record.get(this.fieldNames[i], 
this.outputSchema);
+
+                       // Set field value in Flink tuple.
+                       // Partition columns are returned as String and
+                       //   need to be converted to original type.
+                       switch(this.outputSchema.get(i).getType()) {
+                               case INT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Integer.parseInt((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case TINYINT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Byte.parseByte((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case SMALLINT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Short.parseShort((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case BIGINT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Long.parseLong((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case BOOLEAN:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Boolean.parseBoolean((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case FLOAT:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Float.parseFloat((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case DOUBLE:
+                                       if(o instanceof String) {
+                                               
tuple.setField(Double.parseDouble((String) o), i);
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case STRING:
+                                       tuple.setField(o, i);
+                                       break;
+                               case BINARY:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type BINARY.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case ARRAY:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type ARRAY.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case MAP:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type MAP.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               case STRUCT:
+                                       if(o instanceof String) {
+                                               throw new 
RuntimeException("Cannot handle partition keys of type STRUCT.");
+                                       } else {
+                                               tuple.setField(o, i);
+                                       }
+                                       break;
+                               default:
+                                       throw new RuntimeException("Invalid 
Type");
+                       }
+               }
+
+               return (T)tuple;
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
 
b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
new file mode 100644
index 0000000..0299ee1
--- /dev/null
+++ 
b/flink-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hcatalog.scala
+
+import org.apache.flink.configuration
+import org.apache.flink.hcatalog.HCatInputFormatBase
+import org.apache.hadoop.conf.Configuration
+import org.apache.hive.hcatalog.data.HCatRecord
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
+
+/**
+ * A InputFormat to read from HCatalog tables.
+ * The InputFormat supports projection (selection and order of fields) and 
partition filters.
+ *
+ * Data can be returned as [[HCatRecord]] or Scala tuples.
+ * Scala tuples support only up to 22 fields.
+ *
+ */
+class HCatInputFormat[T](
+                        database: String,
+                        table: String,
+                        config: Configuration
+                          ) extends HCatInputFormatBase[T](database, table, 
config) {
+
+  def this(database: String, table: String) {
+    this(database, table, new Configuration)
+  }
+
+  var vals: Array[Any] = Array[Any]()
+
+  override def configure(parameters: configuration.Configuration): Unit = {
+    super.configure(parameters)
+    vals = new Array[Any](fieldNames.length)
+  }
+
+  override protected def getMaxFlinkTupleSize: Int = 22
+
+  override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
+
+    // Extract all fields from HCatRecord
+    var i: Int = 0
+    while (i < this.fieldNames.length) {
+
+        val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
+
+        // partition columns are returned as String
+        //   Check and convert to actual type.
+        this.outputSchema.get(i).getType match {
+          case HCatFieldSchema.Type.INT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt
+            }
+            else {
+              vals(i) = o.asInstanceOf[Int]
+            }
+          case HCatFieldSchema.Type.TINYINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toByte
+            }
+            else {
+              vals(i) = o.asInstanceOf[Byte]
+            }
+          case HCatFieldSchema.Type.SMALLINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toInt.toShort
+            }
+            else {
+              vals(i) = o.asInstanceOf[Short]
+            }
+          case HCatFieldSchema.Type.BIGINT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toLong
+            }
+            else {
+              vals(i) = o.asInstanceOf[Long]
+            }
+          case HCatFieldSchema.Type.BOOLEAN =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toBoolean
+            }
+            else {
+              vals(i) = o.asInstanceOf[Boolean]
+            }
+          case HCatFieldSchema.Type.FLOAT =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toFloat
+            }
+            else {
+              vals(i) = o.asInstanceOf[Float]
+            }
+          case HCatFieldSchema.Type.DOUBLE =>
+            if (o.isInstanceOf[String]) {
+              vals(i) = o.asInstanceOf[String].toDouble
+            }
+            else {
+              vals(i) = o.asInstanceOf[Double]
+            }
+          case HCatFieldSchema.Type.STRING =>
+            vals(i) = o
+          case HCatFieldSchema.Type.BINARY =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
BINARY.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[Array[Byte]]
+            }
+          case HCatFieldSchema.Type.ARRAY =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
ARRAY.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[List[Object]]
+            }
+          case HCatFieldSchema.Type.MAP =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
MAP.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[Map[Object, Object]]
+            }
+          case HCatFieldSchema.Type.STRUCT =>
+            if (o.isInstanceOf[String]) {
+              throw new RuntimeException("Cannot handle partition keys of type 
STRUCT.")
+            }
+            else {
+              vals(i) = o.asInstanceOf[List[Object]]
+            }
+          case _ =>
+            throw new RuntimeException("Invalid type " + 
this.outputSchema.get(i).getType +
+              " encountered.")
+        }
+
+        i += 1
+      }
+    createScalaTuple(vals)
+  }
+
+  private def createScalaTuple(vals: Array[Any]): T = {
+
+    this.fieldNames.length match {
+      case 1 =>
+        new Tuple1(vals(0)).asInstanceOf[T]
+      case 2 =>
+        new Tuple2(vals(0), vals(1)).asInstanceOf[T]
+      case 3 =>
+        new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
+      case 4 =>
+        new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
+      case 5 =>
+        new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
+      case 6 =>
+        new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), 
vals(5)).asInstanceOf[T]
+      case 7 =>
+        new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6)).asInstanceOf[T]
+      case 8 =>
+        new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7))
+          .asInstanceOf[T]
+      case 9 =>
+        new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8)).asInstanceOf[T]
+      case 10 =>
+        new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9)).asInstanceOf[T]
+      case 11 =>
+        new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10)).asInstanceOf[T]
+      case 12 =>
+        new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
+      case 13 =>
+        new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
+      case 14 =>
+        new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), 
vals(13)).asInstanceOf[T]
+      case 15 =>
+        new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), 
vals(14)).asInstanceOf[T]
+      case 16 =>
+        new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15))
+          .asInstanceOf[T]
+      case 17 =>
+        new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16)).asInstanceOf[T]
+      case 18 =>
+        new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17)).asInstanceOf[T]
+      case 19 =>
+        new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18)).asInstanceOf[T]
+      case 20 =>
+        new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
+      case 21 =>
+        new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
+      case 22 =>
+        new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
+          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
+          vals(16), vals(17), vals(18), vals(19), vals(20), 
vals(21)).asInstanceOf[T]
+      case _ =>
+        throw new RuntimeException("Only up to 22 fields supported for Scala 
Tuples.")
+
+  }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml 
b/flink-connectors/flink-jdbc/pom.xml
new file mode 100644
index 0000000..be42648
--- /dev/null
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+       
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.2-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-jdbc</artifactId>
+       <name>flink-jdbc</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.derby</groupId>
+                       <artifactId>derby</artifactId>
+                       <version>10.10.1.1</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
new file mode 100644
index 0000000..b4246f5
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InputFormat to read data from a database and generate Rows.
+ * The InputFormat has to be configured using the supplied InputFormatBuilder.
+ * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
+ *
+ * <pre><code>
+ * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+ *             BasicTypeInfo.INT_TYPE_INFO,
+ *             BasicTypeInfo.STRING_TYPE_INFO,
+ *             BasicTypeInfo.STRING_TYPE_INFO,
+ *             BasicTypeInfo.DOUBLE_TYPE_INFO,
+ *             BasicTypeInfo.INT_TYPE_INFO
+ *     };
+ *
+ * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *                             
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *                             .setDBUrl("jdbc:derby:memory:ebookshop")
+ *                             .setQuery("select * from books")
+ *                             .setRowTypeInfo(rowTypeInfo)
+ *                             .finish();
+ * </code></pre>
+ *
+ * In order to query the JDBC source in parallel, you need to provide a
+ * parameterized query template (i.e. a valid {@link PreparedStatement}) and
+ * a {@link ParameterValuesProvider} which provides binding values for the
+ * query parameters. E.g.:</br>
+ *
+ * <pre><code>
+ *
+ * Serializable[][] queryParameters = new String[2][1];
+ * queryParameters[0] = new String[]{"Kumar"};
+ * queryParameters[1] = new String[]{"Tan Ah Teck"};
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *                             
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *                             .setDBUrl("jdbc:derby:memory:ebookshop")
+ *                             .setQuery("select * from books WHERE author = 
?")
+ *                             .setRowTypeInfo(rowTypeInfo)
+ *                             .setParametersProvider(new 
GenericParameterValuesProvider(queryParameters))
+ *                             .finish();
+ * </code></pre>
+ *
+ * @see Row
+ * @see ParameterValuesProvider
+ * @see PreparedStatement
+ * @see DriverManager
+ */
+public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> 
implements ResultTypeQueryable {
+
+       private static final long serialVersionUID = 1L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCInputFormat.class);
+
+       private String username;
+       private String password;
+       private String drivername;
+       private String dbURL;
+       private String queryTemplate;
+       private int resultSetType;
+       private int resultSetConcurrency;
+       private RowTypeInfo rowTypeInfo;
+
+       private transient Connection dbConn;
+       private transient PreparedStatement statement;
+       private transient ResultSet resultSet;
+
+       private boolean hasNext;
+       private Object[][] parameterValues;
+
+       public JDBCInputFormat() {
+       }
+
+       @Override
+       public RowTypeInfo getProducedType() {
+               return rowTypeInfo;
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+               //do nothing here
+       }
+
+       @Override
+       public void openInputFormat() {
+               //called once per inputFormat (on open)
+               try {
+                       Class.forName(drivername);
+                       if (username == null) {
+                               dbConn = DriverManager.getConnection(dbURL);
+                       } else {
+                               dbConn = DriverManager.getConnection(dbURL, 
username, password);
+                       }
+                       statement = dbConn.prepareStatement(queryTemplate, 
resultSetType, resultSetConcurrency);
+               } catch (SQLException se) {
+                       throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
+               } catch (ClassNotFoundException cnfe) {
+                       throw new IllegalArgumentException("JDBC-Class not 
found. - " + cnfe.getMessage(), cnfe);
+               }
+       }
+
+       @Override
+       public void closeInputFormat() {
+               //called once per inputFormat (on close)
+               try {
+                       if(statement != null) {
+                               statement.close();
+                       }
+               } catch (SQLException se) {
+                       LOG.info("Inputformat Statement couldn't be closed - " 
+ se.getMessage());
+               } finally {
+                       statement = null;
+               }
+
+               try {
+                       if(dbConn != null) {
+                               dbConn.close();
+                       }
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } finally {
+                       dbConn = null;
+               }
+
+               parameterValues = null;
+       }
+
+       /**
+        * Connects to the source database and executes the query in a 
<b>parallel
+        * fashion</b> if
+        * this {@link InputFormat} is built using a parameterized query (i.e. 
using
+        * a {@link PreparedStatement})
+        * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
+        * fashion</b> otherwise.
+        *
+        * @param inputSplit which is ignored if this InputFormat is executed 
as a
+        *        non-parallel source,
+        *        a "hook" to the query parameters otherwise (using its
+        *        <i>splitNumber</i>)
+        * @throws IOException if there's an error during the execution of the 
query
+        */
+       @Override
+       public void open(InputSplit inputSplit) throws IOException {
+               try {
+                       if (inputSplit != null && parameterValues != null) {
+                               for (int i = 0; i < 
parameterValues[inputSplit.getSplitNumber()].length; i++) {
+                                       Object param = 
parameterValues[inputSplit.getSplitNumber()][i];
+                                       if (param instanceof String) {
+                                               statement.setString(i + 1, 
(String) param);
+                                       } else if (param instanceof Long) {
+                                               statement.setLong(i + 1, (Long) 
param);
+                                       } else if (param instanceof Integer) {
+                                               statement.setInt(i + 1, 
(Integer) param);
+                                       } else if (param instanceof Double) {
+                                               statement.setDouble(i + 1, 
(Double) param);
+                                       } else if (param instanceof Boolean) {
+                                               statement.setBoolean(i + 1, 
(Boolean) param);
+                                       } else if (param instanceof Float) {
+                                               statement.setFloat(i + 1, 
(Float) param);
+                                       } else if (param instanceof BigDecimal) 
{
+                                               statement.setBigDecimal(i + 1, 
(BigDecimal) param);
+                                       } else if (param instanceof Byte) {
+                                               statement.setByte(i + 1, (Byte) 
param);
+                                       } else if (param instanceof Short) {
+                                               statement.setShort(i + 1, 
(Short) param);
+                                       } else if (param instanceof Date) {
+                                               statement.setDate(i + 1, (Date) 
param);
+                                       } else if (param instanceof Time) {
+                                               statement.setTime(i + 1, (Time) 
param);
+                                       } else if (param instanceof Timestamp) {
+                                               statement.setTimestamp(i + 1, 
(Timestamp) param);
+                                       } else if (param instanceof Array) {
+                                               statement.setArray(i + 1, 
(Array) param);
+                                       } else {
+                                               //extends with other types if 
needed
+                                               throw new 
IllegalArgumentException("open() failed. Parameter " + i + " of type " + 
param.getClass() + " is not handled (yet)." );
+                                       }
+                               }
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug(String.format("Executing '%s' 
with parameters %s", queryTemplate, 
Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+                               }
+                       }
+                       resultSet = statement.executeQuery();
+                       hasNext = resultSet.next();
+               } catch (SQLException se) {
+                       throw new IllegalArgumentException("open() failed." + 
se.getMessage(), se);
+               }
+       }
+
+       /**
+        * Closes all resources used.
+        *
+        * @throws IOException Indicates that a resource could not be closed.
+        */
+       @Override
+       public void close() throws IOException {
+               if(resultSet == null) {
+                       return;
+               }
+               try {
+                       resultSet.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat ResultSet couldn't be closed - " 
+ se.getMessage());
+               }
+       }
+
+       /**
+        * Checks whether all data has been read.
+        *
+        * @return boolean value indication whether all data has been read.
+        * @throws IOException
+        */
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return !hasNext;
+       }
+
+       /**
+        * Stores the next resultSet row in a tuple
+        *
+        * @param row row to be reused.
+        * @return row containing next {@link Row}
+        * @throws java.io.IOException
+        */
+       @Override
+       public Row nextRecord(Row row) throws IOException {
+               try {
+                       if (!hasNext) {
+                               return null;
+                       }
+                       for (int pos = 0; pos < row.productArity(); pos++) {
+                               row.setField(pos, resultSet.getObject(pos + 1));
+                       }
+                       //update hasNext after we've read the record
+                       hasNext = resultSet.next();
+                       return row;
+               } catch (SQLException se) {
+                       throw new IOException("Couldn't read data - " + 
se.getMessage(), se);
+               } catch (NullPointerException npe) {
+                       throw new IOException("Couldn't access resultSet", npe);
+               }
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+               return cachedStatistics;
+       }
+
+       @Override
+       public InputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
+               if (parameterValues == null) {
+                       return new GenericInputSplit[]{new GenericInputSplit(0, 
1)};
+               }
+               GenericInputSplit[] ret = new 
GenericInputSplit[parameterValues.length];
+               for (int i = 0; i < ret.length; i++) {
+                       ret[i] = new GenericInputSplit(i, ret.length);
+               }
+               return ret;
+       }
+
+       @Override
+       public InputSplitAssigner getInputSplitAssigner(InputSplit[] 
inputSplits) {
+               return new DefaultInputSplitAssigner(inputSplits);
+       }
+
+       /**
+        * A builder used to set parameters to the output format's 
configuration in a fluent way.
+        * @return builder
+        */
+       public static JDBCInputFormatBuilder buildJDBCInputFormat() {
+               return new JDBCInputFormatBuilder();
+       }
+
+       public static class JDBCInputFormatBuilder {
+               private final JDBCInputFormat format;
+
+               public JDBCInputFormatBuilder() {
+                       this.format = new JDBCInputFormat();
+                       //using TYPE_FORWARD_ONLY for high performance reads
+                       this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
+                       this.format.resultSetConcurrency = 
ResultSet.CONCUR_READ_ONLY;
+               }
+
+               public JDBCInputFormatBuilder setUsername(String username) {
+                       format.username = username;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setPassword(String password) {
+                       format.password = password;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setDrivername(String drivername) {
+                       format.drivername = drivername;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setDBUrl(String dbURL) {
+                       format.dbURL = dbURL;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setQuery(String query) {
+                       format.queryTemplate = query;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setResultSetType(int 
resultSetType) {
+                       format.resultSetType = resultSetType;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setResultSetConcurrency(int 
resultSetConcurrency) {
+                       format.resultSetConcurrency = resultSetConcurrency;
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder 
setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
+                       format.parameterValues = 
parameterValuesProvider.getParameterValues();
+                       return this;
+               }
+
+               public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo 
rowTypeInfo) {
+                       format.rowTypeInfo = rowTypeInfo;
+                       return this;
+               }
+
+               public JDBCInputFormat finish() {
+                       if (format.username == null) {
+                               LOG.info("Username was not supplied 
separately.");
+                       }
+                       if (format.password == null) {
+                               LOG.info("Password was not supplied 
separately.");
+                       }
+                       if (format.dbURL == null) {
+                               throw new IllegalArgumentException("No database 
URL supplied");
+                       }
+                       if (format.queryTemplate == null) {
+                               throw new IllegalArgumentException("No query 
supplied");
+                       }
+                       if (format.drivername == null) {
+                               throw new IllegalArgumentException("No driver 
supplied");
+                       }
+                       if (format.rowTypeInfo == null) {
+                               throw new IllegalArgumentException("No " + 
RowTypeInfo.class.getSimpleName() + " supplied");
+                       }
+                       if (format.parameterValues == null) {
+                               LOG.debug("No input splitting configured (data 
will be read with parallelism 1).");
+                       }
+                       return format;
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
new file mode 100644
index 0000000..da4b1ad
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OutputFormat to write tuples into a database.
+ * The OutputFormat has to be configured using the supplied 
OutputFormatBuilder.
+ * 
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCOutputFormat extends RichOutputFormat<Row> {
+       private static final long serialVersionUID = 1L;
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCOutputFormat.class);
+       
+       private String username;
+       private String password;
+       private String drivername;
+       private String dbURL;
+       private String query;
+       private int batchInterval = 5000;
+       
+       private Connection dbConn;
+       private PreparedStatement upload;
+       
+       private int batchCount = 0;
+       
+       public int[] typesArray;
+       
+       public JDBCOutputFormat() {
+       }
+       
+       @Override
+       public void configure(Configuration parameters) {
+       }
+       
+       /**
+        * Connects to the target database and initializes the prepared 
statement.
+        *
+        * @param taskNumber The number of the parallel instance.
+        * @throws IOException Thrown, if the output could not be opened due to 
an
+        * I/O problem.
+        */
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {
+               try {
+                       establishConnection();
+                       upload = dbConn.prepareStatement(query);
+               } catch (SQLException sqe) {
+                       throw new IllegalArgumentException("open() failed.", 
sqe);
+               } catch (ClassNotFoundException cnfe) {
+                       throw new IllegalArgumentException("JDBC driver class 
not found.", cnfe);
+               }
+       }
+       
+       private void establishConnection() throws SQLException, 
ClassNotFoundException {
+               Class.forName(drivername);
+               if (username == null) {
+                       dbConn = DriverManager.getConnection(dbURL);
+               } else {
+                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
+               }
+       }
+       
+       /**
+        * Adds a record to the prepared statement.
+        * <p>
+        * When this method is called, the output format is guaranteed to be 
opened.
+        * </p>
+        * 
+        * WARNING: this may fail when no column types specified (because a 
best effort approach is attempted in order to
+        * insert a null value but it's not guaranteed that the JDBC driver 
handles PreparedStatement.setObject(pos, null))
+        *
+        * @param row The records to add to the output.
+        * @see PreparedStatement
+        * @throws IOException Thrown, if the records could not be added due to 
an I/O problem.
+        */
+       @Override
+       public void writeRecord(Row row) throws IOException {
+
+               if (typesArray != null && typesArray.length > 0 && 
typesArray.length != row.productArity()) {
+                       LOG.warn("Column SQL types array doesn't match arity of 
passed Row! Check the passed array...");
+               } 
+               try {
+
+                       if (typesArray == null ) {
+                               // no types provided
+                               for (int index = 0; index < row.productArity(); 
index++) {
+                                       LOG.warn("Unknown column type for 
column %s. Best effort approach to set its value: %s.", index + 1, 
row.productElement(index));
+                                       upload.setObject(index + 1, 
row.productElement(index));
+                               }
+                       } else {
+                               // types provided
+                               for (int index = 0; index < row.productArity(); 
index++) {
+
+                                       if (row.productElement(index) == null) {
+                                               upload.setNull(index + 1, 
typesArray[index]);
+                                       } else {
+                                               // casting values as suggested 
by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+                                               switch (typesArray[index]) {
+                                                       case 
java.sql.Types.NULL:
+                                                               
upload.setNull(index + 1, typesArray[index]);
+                                                               break;
+                                                       case 
java.sql.Types.BOOLEAN:
+                                                       case java.sql.Types.BIT:
+                                                               
upload.setBoolean(index + 1, (boolean) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.CHAR:
+                                                       case 
java.sql.Types.NCHAR:
+                                                       case 
java.sql.Types.VARCHAR:
+                                                       case 
java.sql.Types.LONGVARCHAR:
+                                                       case 
java.sql.Types.LONGNVARCHAR:
+                                                               
upload.setString(index + 1, (String) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.TINYINT:
+                                                               
upload.setByte(index + 1, (byte) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.SMALLINT:
+                                                               
upload.setShort(index + 1, (short) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.INTEGER:
+                                                               
upload.setInt(index + 1, (int) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.BIGINT:
+                                                               
upload.setLong(index + 1, (long) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.REAL:
+                                                               
upload.setFloat(index + 1, (float) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.FLOAT:
+                                                       case 
java.sql.Types.DOUBLE:
+                                                               
upload.setDouble(index + 1, (double) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.DECIMAL:
+                                                       case 
java.sql.Types.NUMERIC:
+                                                               
upload.setBigDecimal(index + 1, (java.math.BigDecimal) 
row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.DATE:
+                                                               
upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.TIME:
+                                                               
upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.TIMESTAMP:
+                                                               
upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
+                                                               break;
+                                                       case 
java.sql.Types.BINARY:
+                                                       case 
java.sql.Types.VARBINARY:
+                                                       case 
java.sql.Types.LONGVARBINARY:
+                                                               
upload.setBytes(index + 1, (byte[]) row.productElement(index));
+                                                               break;
+                                                       default:
+                                                               
upload.setObject(index + 1, row.productElement(index));
+                                                               
LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set 
its value: %s.",
+                                                                       
typesArray[index], index + 1, row.productElement(index));
+                                                               // case 
java.sql.Types.SQLXML
+                                                               // case 
java.sql.Types.ARRAY:
+                                                               // case 
java.sql.Types.JAVA_OBJECT:
+                                                               // case 
java.sql.Types.BLOB:
+                                                               // case 
java.sql.Types.CLOB:
+                                                               // case 
java.sql.Types.NCLOB:
+                                                               // case 
java.sql.Types.DATALINK:
+                                                               // case 
java.sql.Types.DISTINCT:
+                                                               // case 
java.sql.Types.OTHER:
+                                                               // case 
java.sql.Types.REF:
+                                                               // case 
java.sql.Types.ROWID:
+                                                               // case 
java.sql.Types.STRUC
+                                               }
+                                       }
+                               }
+                       }
+                       upload.addBatch();
+                       batchCount++;
+                       if (batchCount >= batchInterval) {
+                               upload.executeBatch();
+                               batchCount = 0;
+                       }
+               } catch (SQLException | IllegalArgumentException e) {
+                       throw new IllegalArgumentException("writeRecord() 
failed", e);
+               }
+       }
+       
+       /**
+        * Executes prepared statement and closes all resources of this 
instance.
+        *
+        * @throws IOException Thrown, if the input could not be closed 
properly.
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       if (upload != null) {
+                               upload.executeBatch();
+                               upload.close();
+                       }
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } finally {
+                       upload = null;
+                       batchCount = 0;
+               }
+               
+               try {
+                       if (dbConn != null) {
+                               dbConn.close();
+                       }
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } finally {
+                       dbConn = null;
+               }
+       }
+       
+       public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
+               return new JDBCOutputFormatBuilder();
+       }
+       
+       public static class JDBCOutputFormatBuilder {
+               private final JDBCOutputFormat format;
+               
+               protected JDBCOutputFormatBuilder() {
+                       this.format = new JDBCOutputFormat();
+               }
+               
+               public JDBCOutputFormatBuilder setUsername(String username) {
+                       format.username = username;
+                       return this;
+               }
+               
+               public JDBCOutputFormatBuilder setPassword(String password) {
+                       format.password = password;
+                       return this;
+               }
+               
+               public JDBCOutputFormatBuilder setDrivername(String drivername) 
{
+                       format.drivername = drivername;
+                       return this;
+               }
+               
+               public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
+                       format.dbURL = dbURL;
+                       return this;
+               }
+               
+               public JDBCOutputFormatBuilder setQuery(String query) {
+                       format.query = query;
+                       return this;
+               }
+               
+               public JDBCOutputFormatBuilder setBatchInterval(int 
batchInterval) {
+                       format.batchInterval = batchInterval;
+                       return this;
+               }
+               
+               public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
+                       format.typesArray = typesArray;
+                       return this;
+               }
+               
+               /**
+                * Finalizes the configuration and checks validity.
+                * 
+                * @return Configured JDBCOutputFormat
+                */
+               public JDBCOutputFormat finish() {
+                       if (format.username == null) {
+                               LOG.info("Username was not supplied 
separately.");
+                       }
+                       if (format.password == null) {
+                               LOG.info("Password was not supplied 
separately.");
+                       }
+                       if (format.dbURL == null) {
+                               throw new IllegalArgumentException("No dababase 
URL supplied.");
+                       }
+                       if (format.query == null) {
+                               throw new IllegalArgumentException("No query 
suplied");
+                       }
+                       if (format.drivername == null) {
+                               throw new IllegalArgumentException("No driver 
supplied");
+                       }
+                       
+                       return format;
+               }
+       }
+       
+}

Reply via email to