http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 
b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
deleted file mode 100644
index 35b0a7c..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables.
- */
-public abstract class TableInputFormat<T extends Tuple> extends 
RichInputFormat<T, TableInputSplit> {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(TableInputFormat.class);
-
-       /** helper variable to decide whether the input is exhausted or not */
-       private boolean endReached = false;
-
-       protected transient HTable table = null;
-       protected transient Scan scan = null;
-
-       /** HBase iterator wrapper */
-       private ResultScanner resultScanner = null;
-
-       private byte[] lastRow;
-       private int scannedRows;
-
-       /**
-        * Returns an instance of Scan that retrieves the required subset of 
records from the HBase table.
-        * @return The appropriate instance of Scan for this usecase.
-        */
-       protected abstract Scan getScanner();
-
-       /**
-        * What table is to be read.
-        * Per instance of a TableInputFormat derivative only a single 
tablename is possible.
-        * @return The name of the table
-        */
-       protected abstract String getTableName();
-
-       /**
-        * The output from HBase is always an instance of {@link Result}.
-        * This method is to copy the data in the Result instance into the 
required {@link Tuple}
-        * @param r The Result instance from HBase that needs to be converted
-        * @return The approriate instance of {@link Tuple} that contains the 
needed information.
-        */
-       protected abstract T mapResultToTuple(Result r);
-
-       /**
-        * Creates a {@link Scan} object and opens the {@link HTable} 
connection.
-        * These are opened here because they are needed in the 
createInputSplits
-        * which is called before the openInputFormat method.
-        * So the connection is opened in {@link #configure(Configuration)} and 
closed in {@link #closeInputFormat()}.
-        *
-        * @param parameters The configuration that is to be used
-        * @see Configuration
-        */
-       @Override
-       public void configure(Configuration parameters) {
-               table = createTable();
-               if (table != null) {
-                       scan = getScanner();
-               }
-       }
-
-       /**
-        * Create an {@link HTable} instance and set it into this format
-        */
-       private HTable createTable() {
-               LOG.info("Initializing HBaseConfiguration");
-               //use files found in the classpath
-               org.apache.hadoop.conf.Configuration hConf = 
HBaseConfiguration.create();
-
-               try {
-                       return new HTable(hConf, getTableName());
-               } catch (Exception e) {
-                       LOG.error("Error instantiating a new HTable instance", 
e);
-               }
-               return null;
-       }
-
-       @Override
-       public void open(TableInputSplit split) throws IOException {
-               if (table == null) {
-                       throw new IOException("The HBase table has not been 
opened!");
-               }
-               if (scan == null) {
-                       throw new IOException("getScanner returned null");
-               }
-               if (split == null) {
-                       throw new IOException("Input split is null!");
-               }
-
-               logSplitInfo("opening", split);
-               scan.setStartRow(split.getStartRow());
-               lastRow = split.getEndRow();
-               scan.setStopRow(lastRow);
-
-               resultScanner = table.getScanner(scan);
-               endReached = false;
-               scannedRows = 0;
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return endReached;
-       }
-
-       @Override
-       public T nextRecord(T reuse) throws IOException {
-               if (resultScanner == null) {
-                       throw new IOException("No table result scanner 
provided!");
-               }
-               try {
-                       Result res = resultScanner.next();
-                       if (res != null) {
-                               scannedRows++;
-                               lastRow = res.getRow();
-                               return mapResultToTuple(res);
-                       }
-               } catch (Exception e) {
-                       resultScanner.close();
-                       //workaround for timeout on scan
-                       LOG.warn("Error after scan of " + scannedRows + " rows. 
Retry with a new scanner...", e);
-                       scan.setStartRow(lastRow);
-                       resultScanner = table.getScanner(scan);
-                       Result res = resultScanner.next();
-                       if (res != null) {
-                               scannedRows++;
-                               lastRow = res.getRow();
-                               return mapResultToTuple(res);
-                       }
-               }
-
-               endReached = true;
-               return null;
-       }
-
-       @Override
-       public void close() throws IOException {
-               LOG.info("Closing split (scanned {} rows)", scannedRows);
-               lastRow = null;
-               try {
-                       if (resultScanner != null) {
-                               resultScanner.close();
-                       }
-               } finally {
-                       resultScanner = null;
-               }
-       }
-
-       @Override
-       public void closeInputFormat() throws IOException {
-               try {
-                       if (table != null) {
-                               table.close();
-                       }
-               } finally {
-                       table = null;
-               }
-       }
-
-       @Override
-       public TableInputSplit[] createInputSplits(final int minNumSplits) 
throws IOException {
-               if (table == null) {
-                       throw new IOException("The HBase table has not been 
opened!");
-               }
-               if (scan == null) {
-                       throw new IOException("getScanner returned null");
-               }
-
-               //Gets the starting and ending row keys for every region in the 
currently open table
-               final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
-               if (keys == null || keys.getFirst() == null || 
keys.getFirst().length == 0) {
-                       throw new IOException("Expecting at least one region.");
-               }
-               final byte[] startRow = scan.getStartRow();
-               final byte[] stopRow = scan.getStopRow();
-               final boolean scanWithNoLowerBound = startRow.length == 0;
-               final boolean scanWithNoUpperBound = stopRow.length == 0;
-
-               final List<TableInputSplit> splits = new 
ArrayList<TableInputSplit>(minNumSplits);
-               for (int i = 0; i < keys.getFirst().length; i++) {
-                       final byte[] startKey = keys.getFirst()[i];
-                       final byte[] endKey = keys.getSecond()[i];
-                       final String regionLocation = 
table.getRegionLocation(startKey, false).getHostnamePort();
-                       //Test if the given region is to be included in the 
InputSplit while splitting the regions of a table
-                       if (!includeRegionInSplit(startKey, endKey)) {
-                               continue;
-                       }
-                       //Finds the region on which the given row is being 
served
-                       final String[] hosts = new String[]{regionLocation};
-
-                       // determine if regions contains keys used by the scan
-                       boolean isLastRegion = endKey.length == 0;
-                       if ((scanWithNoLowerBound || isLastRegion || 
Bytes.compareTo(startRow, endKey) < 0) &&
-                               (scanWithNoUpperBound || 
Bytes.compareTo(stopRow, startKey) > 0)) {
-
-                               final byte[] splitStart = scanWithNoLowerBound 
|| Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
-                               final byte[] splitStop = (scanWithNoUpperBound 
|| Bytes.compareTo(endKey, stopRow) <= 0)
-                                       && !isLastRegion ? endKey : stopRow;
-                               int id = splits.size();
-                               final TableInputSplit split = new 
TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
-                               splits.add(split);
-                       }
-               }
-               LOG.info("Created " + splits.size() + " splits");
-               for (TableInputSplit split : splits) {
-                       logSplitInfo("created", split);
-               }
-               return splits.toArray(new TableInputSplit[0]);
-       }
-
-       private void logSplitInfo(String action, TableInputSplit split) {
-               int splitId = split.getSplitNumber();
-               String splitStart = Bytes.toString(split.getStartRow());
-               String splitEnd = Bytes.toString(split.getEndRow());
-               String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
-               String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
-               String[] hostnames = split.getHostnames();
-               LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, 
splitId, hostnames, splitStartKey, splitStopKey);
-       }
-
-       /**
-        * Test if the given region is to be included in the InputSplit while 
splitting the regions of a table.
-        * <p>
-        * This optimization is effective when there is a specific reasoning to 
exclude an entire region from the M-R job,
-        * (and hence, not contributing to the InputSplit), given the start and 
end keys of the same. <br>
-        * Useful when we need to remember the last-processed top record and 
revisit the [last, current) interval for M-R
-        * processing, continuously. In addition to reducing InputSplits, 
reduces the load on the region server as well, due
-        * to the ordering of the keys. <br>
-        * <br>
-        * Note: It is possible that <code>endKey.length() == 0 </code> , for 
the last (recent) region. <br>
-        * Override this method, if you want to bulk exclude regions altogether 
from M-R. By default, no region is excluded(
-        * i.e. all regions are included).
-        *
-        * @param startKey Start key of the region
-        * @param endKey   End key of the region
-        * @return true, if this region needs to be included as part of the 
input (default).
-        */
-       protected boolean includeRegionInSplit(final byte[] startKey, final 
byte[] endKey) {
-               return true;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] 
inputSplits) {
-               return new LocatableInputSplitAssigner(inputSplits);
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-               return null;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
 
b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
deleted file mode 100644
index 75f0b9b..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-
-/**
- * This class implements a input splits for HBase. Each table input split 
corresponds to a key range (low, high). All
- * references to row below refer to the key of the row.
- */
-public class TableInputSplit extends LocatableInputSplit {
-
-       private static final long serialVersionUID = 1L;
-
-       /** The name of the table to retrieve data from */
-       private final byte[] tableName;
-
-       /** The start row of the split. */
-       private final byte[] startRow;
-
-       /** The end row of the split. */
-       private final byte[] endRow;
-
-       /**
-        * Creates a new table input split
-        * 
-        * @param splitNumber
-        *        the number of the input split
-        * @param hostnames
-        *        the names of the hosts storing the data the input split 
refers to
-        * @param tableName
-        *        the name of the table to retrieve data from
-        * @param startRow
-        *        the start row of the split
-        * @param endRow
-        *        the end row of the split
-        */
-       TableInputSplit(final int splitNumber, final String[] hostnames, final 
byte[] tableName, final byte[] startRow,
-                       final byte[] endRow) {
-               super(splitNumber, hostnames);
-
-               this.tableName = tableName;
-               this.startRow = startRow;
-               this.endRow = endRow;
-       }
-
-       /**
-        * Returns the table name.
-        * 
-        * @return The table name.
-        */
-       public byte[] getTableName() {
-               return this.tableName;
-       }
-
-       /**
-        * Returns the start row.
-        * 
-        * @return The start row.
-        */
-       public byte[] getStartRow() {
-               return this.startRow;
-       }
-
-       /**
-        * Returns the end row.
-        * 
-        * @return The end row.
-        */
-       public byte[] getEndRow() {
-               return this.endRow;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
deleted file mode 100644
index 3d9f672..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.ScannerCallable;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.log4j.Level;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * By using this class as the super class of a set of tests you will have a 
HBase testing
- * cluster available that is very suitable for writing tests for scanning and 
filtering against.
- * This is usable by any downstream application because the HBase cluster is 
'injected' because
- * a dynamically generated hbase-site.xml is added to the classpath.
- * Because of this classpath manipulation it is not possible to start a second 
testing cluster in the same JVM.
- * So if you have this you should either put all hbase related tests in a 
single class or force surefire to
- * setup a new JVM for each testclass.
- * See: 
http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html
- */
-//
-// NOTE: The code in this file is based on code from the
-// Apache HBase project, licensed under the Apache License v 2.0
-//
-// 
https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
-//
-public class HBaseTestingClusterAutostarter implements Serializable {
-
-       private static final Log LOG = 
LogFactory.getLog(HBaseTestingClusterAutostarter.class);
-
-       private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-       private static HBaseAdmin admin = null;
-       private static List<TableName> createdTables = new ArrayList<>();
-
-       private static boolean alreadyRegisteredTestCluster = false;
-
-       protected static void createTable(TableName tableName, byte[] 
columnFamilyName, byte[][] splitKeys) {
-               LOG.info("HBase minicluster: Creating table " + 
tableName.getNameAsString());
-
-               assertNotNull("HBaseAdmin is not initialized successfully.", 
admin);
-               HTableDescriptor desc = new HTableDescriptor(tableName);
-               HColumnDescriptor colDef = new 
HColumnDescriptor(columnFamilyName);
-               desc.addFamily(colDef);
-
-               try {
-                       admin.createTable(desc, splitKeys);
-                       createdTables.add(tableName);
-                       assertTrue("Fail to create the table", 
admin.tableExists(tableName));
-               } catch (IOException e) {
-                       assertNull("Exception found while creating table", e);
-               }
-       }
-
-       protected static HTable openTable(TableName tableName) throws 
IOException {
-               HTable table = (HTable) 
admin.getConnection().getTable(tableName);
-               assertTrue("Fail to create the table", 
admin.tableExists(tableName));
-               return table;
-       }
-
-       private static void deleteTables() {
-               if (admin != null) {
-                       for (TableName tableName : createdTables) {
-                               try {
-                                       if (admin.tableExists(tableName)) {
-                                               admin.disableTable(tableName);
-                                               admin.deleteTable(tableName);
-                                       }
-                               } catch (IOException e) {
-                                       assertNull("Exception found deleting 
the table", e);
-                               }
-                       }
-               }
-       }
-
-       private static void initialize(Configuration conf) {
-               conf = HBaseConfiguration.create(conf);
-               conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
-               try {
-                       admin = TEST_UTIL.getHBaseAdmin();
-               } catch (MasterNotRunningException e) {
-                       assertNull("Master is not running", e);
-               } catch (ZooKeeperConnectionException e) {
-                       assertNull("Cannot connect to ZooKeeper", e);
-               } catch (IOException e) {
-                       assertNull("IOException", e);
-               }
-       }
-
-       @BeforeClass
-       public static void setUp() throws Exception {
-               LOG.info("HBase minicluster: Starting");
-               ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL);
-               ((Log4JLogger) 
AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
-               ((Log4JLogger) 
ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
-
-               TEST_UTIL.startMiniCluster(1);
-
-               // https://issues.apache.org/jira/browse/HBASE-11711
-               TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 
-1);
-
-               // Make sure the zookeeper quorum value contains the right port 
number (varies per run).
-               TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", 
"localhost:" + TEST_UTIL.getZkCluster().getClientPort());
-
-               initialize(TEST_UTIL.getConfiguration());
-               LOG.info("HBase minicluster: Running");
-       }
-
-       private static File hbaseSiteXmlDirectory;
-       private static File hbaseSiteXmlFile;
-
-       /**
-        * This dynamically generates a hbase-site.xml file that is added to 
the classpath.
-        * This way this HBaseMinicluster can be used by an unmodified 
application.
-        * The downside is that this cannot be 'unloaded' so you can have only 
one per JVM.
-        */
-       public static void registerHBaseMiniClusterInClasspath() {
-               if (alreadyRegisteredTestCluster) {
-                       fail("You CANNOT register a second HBase Testing 
cluster in the classpath of the SAME JVM");
-               }
-               File baseDir = new File(System.getProperty("java.io.tmpdir", 
"/tmp/"));
-               hbaseSiteXmlDirectory = new File(baseDir, 
"unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
-
-               if (!hbaseSiteXmlDirectory.mkdirs()) {
-                       fail("Unable to create output directory " + 
hbaseSiteXmlDirectory + " for the HBase minicluster");
-               }
-
-               assertNotNull("The ZooKeeper for the HBase minicluster is 
missing", TEST_UTIL.getZkCluster());
-
-               createHBaseSiteXml(hbaseSiteXmlDirectory, 
TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
-               addDirectoryToClassPath(hbaseSiteXmlDirectory);
-
-               // Avoid starting it again.
-               alreadyRegisteredTestCluster = true;
-       }
-
-       private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, 
String zookeeperQuorum) {
-               hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, 
"hbase-site.xml");
-               // Create the hbase-site.xml file for this run.
-               try {
-                       String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
-                               "<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>\n" +
-                               "<configuration>\n" +
-                               "  <property>\n" +
-                               "    <name>hbase.zookeeper.quorum</name>\n" +
-                               "    <value>" + zookeeperQuorum + "</value>\n" +
-                               "  </property>\n" +
-                               "</configuration>";
-                       OutputStream fos = new 
FileOutputStream(hbaseSiteXmlFile);
-                       
fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
-                       fos.close();
-               } catch (IOException e) {
-                       fail("Unable to create " + hbaseSiteXmlFile);
-               }
-       }
-
-       private static void addDirectoryToClassPath(File directory) {
-               try {
-                       // Get the classloader actually used by 
HBaseConfiguration
-                       ClassLoader classLoader = 
HBaseConfiguration.create().getClassLoader();
-                       if (!(classLoader instanceof URLClassLoader)) {
-                               fail("We should get a URLClassLoader");
-                       }
-
-                       // Make the addURL method accessible
-                       Method method = 
URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
-                       method.setAccessible(true);
-
-                       // Add the directory where we put the hbase-site.xml to 
the classpath
-                       method.invoke(classLoader, directory.toURI().toURL());
-               } catch (MalformedURLException | NoSuchMethodException | 
IllegalAccessException | InvocationTargetException e) {
-                       fail("Unable to add " + directory + " to classpath 
because of this exception: " + e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() throws Exception {
-               LOG.info("HBase minicluster: Shutting down");
-               deleteTables();
-               hbaseSiteXmlFile.delete();
-               hbaseSiteXmlDirectory.delete();
-               TEST_UTIL.shutdownMiniCluster();
-               LOG.info("HBase minicluster: Down");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
deleted file mode 100644
index 3dddd88..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
-       private static final String TEST_TABLE_NAME = 
"TableInputFormatTestTable";
-       private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
-       private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
-
-       // These are the row ids AND also the values we will put in the test 
table
-       private static final String[] ROW_IDS = {"000", "111", "222", "333", 
"444", "555", "666", "777", "888", "999"};
-
-       @BeforeClass
-       public static void activateHBaseCluster(){
-               registerHBaseMiniClusterInClasspath();
-       }
-
-       @Before
-       public void createTestTable() throws IOException {
-               TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
-               byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), 
"6".getBytes(), "9".getBytes()};
-               createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
-               HTable table = openTable(tableName);
-
-               for (String rowId : ROW_IDS) {
-                       byte[] rowIdBytes = rowId.getBytes();
-                       Put p = new Put(rowIdBytes);
-                       // Use the rowId as the value to facilitate the testing 
better
-                       p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, 
rowIdBytes);
-                       table.put(p);
-               }
-
-               table.close();
-       }
-
-       class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
-               @Override
-               protected Scan getScanner() {
-                       return new Scan();
-               }
-
-               @Override
-               protected String getTableName() {
-                       return TEST_TABLE_NAME;
-               }
-
-               @Override
-               protected Tuple1<String> mapResultToTuple(Result r) {
-                       return new Tuple1<>(new 
String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
-               }
-       }
-
-       @Test
-       public void testTableInputFormat() {
-               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
-               environment.setParallelism(1);
-
-               DataSet<String> resultDataSet =
-                       environment.createInput(new 
InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
-                               @Override
-                               public String map(Tuple1<String> value) throws 
Exception {
-                                       return value.f0;
-                               }
-                       });
-
-               List<String> resultSet = new ArrayList<>();
-               resultDataSet.output(new 
LocalCollectionOutputFormat<>(resultSet));
-
-               try {
-                       environment.execute("HBase InputFormat Test");
-               } catch (Exception e) {
-                       Assert.fail("HBase InputFormat test failed. " + 
e.getMessage());
-               }
-
-               for (String rowId : ROW_IDS) {
-                       assertTrue("Missing rowId from table: " + rowId, 
resultSet.contains(rowId));
-               }
-
-               assertEquals("The number of records is wrong.", ROW_IDS.length, 
resultSet.size());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
deleted file mode 100644
index 8579dee..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseFlinkTestConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-public class HBaseFlinkTestConstants {
-       
-       public static final byte[] CF_SOME = "someCf".getBytes();
-       public static final byte[] Q_SOME = "someQual".getBytes();
-       public static final String TEST_TABLE_NAME = "test-table";
-       public static final String TMP_DIR = "/tmp/test";
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100644
index dccf876..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Simple stub for HBase DataSet read
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
- * <ul>
- *     <li>create 'test-table', 'someCf'</li>
- *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
- *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
- * </ul>
- * 
- * The test should return just the first entry.
- * 
- */
-public class HBaseReadExample {
-       public static void main(String[] args) throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               @SuppressWarnings("serial")
-               DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new 
TableInputFormat<Tuple2<String, String>>() {
-                       
-                               @Override
-                               public String getTableName() {
-                                       return 
HBaseFlinkTestConstants.TEST_TABLE_NAME;
-                               }
-
-                               @Override
-                               protected Scan getScanner() {
-                                       Scan scan = new Scan();
-                                       
scan.addColumn(HBaseFlinkTestConstants.CF_SOME, HBaseFlinkTestConstants.Q_SOME);
-                                       return scan;
-                               }
-
-                               private Tuple2<String, String> reuse = new 
Tuple2<String, String>();
-                               
-                               @Override
-                               protected Tuple2<String, String> 
mapResultToTuple(Result r) {
-                                       String key = Bytes.toString(r.getRow());
-                                       String val = 
Bytes.toString(r.getValue(HBaseFlinkTestConstants.CF_SOME, 
HBaseFlinkTestConstants.Q_SOME));
-                                       reuse.setField(key, 0);
-                                       reuse.setField(val, 1);
-                                       return reuse;
-                               }
-               })
-               .filter(new FilterFunction<Tuple2<String,String>>() {
-
-                       @Override
-                       public boolean filter(Tuple2<String, String> t) throws 
Exception {
-                               String val = t.getField(1);
-                               if(val.startsWith("someStr"))
-                                       return true;
-                               return false;
-                       }
-               });
-               
-               hbaseDs.print();
-               
-               // kick off execution.
-               env.execute();
-                               
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
deleted file mode 100644
index 483bdff..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Simple stub for HBase DataSet write
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
- * <ul>
- *     <li>create 'test-table', 'someCf'</li>
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class HBaseWriteExample {
-       
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-       
-       public static void main(String[] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-               
-               // set up the execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // get input data
-               DataSet<String> text = getTextDataSet(env);
-               
-               DataSet<Tuple2<String, Integer>> counts = 
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               text.flatMap(new Tokenizer())
-                               // group by the tuple field "0" and sum up 
tuple field "1"
-                               .groupBy(0)
-                               .sum(1);
-
-               // emit result
-               Job job = Job.getInstance();
-               job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, 
outputTableName);
-               // TODO is "mapred.output.dir" really useful?
-               
job.getConfiguration().set("mapred.output.dir",HBaseFlinkTestConstants.TMP_DIR);
-               counts.map(new RichMapFunction <Tuple2<String,Integer>, 
Tuple2<Text,Mutation>>() {
-                       private transient Tuple2<Text, Mutation> reuse;
-
-                       @Override
-                       public void open(Configuration parameters) throws 
Exception {
-                               super.open(parameters);
-                               reuse = new Tuple2<Text, Mutation>();
-                       }
-
-                       @Override
-                       public Tuple2<Text, Mutation> map(Tuple2<String, 
Integer> t) throws Exception {
-                               reuse.f0 = new Text(t.f0);
-                               Put put = new Put(t.f0.getBytes());
-                               
put.add(HBaseFlinkTestConstants.CF_SOME,HBaseFlinkTestConstants.Q_SOME, 
Bytes.toBytes(t.f1));
-                               reuse.f1 = put;
-                               return reuse;
-                       }
-               }).output(new HadoopOutputFormat<Text, Mutation>(new 
TableOutputFormat<Text>(), job));
-               
-               // execute program
-               env.execute("WordCount (HBase sink) Example");
-       }
-       
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-       
-       /**
-        * Implements the string tokenizer that splits sentences into words as 
a user-defined
-        * FlatMapFunction. The function takes a line (String) and splits it 
into 
-        * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
-        */
-       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
-
-               @Override
-               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
-                       // normalize and split the line
-                       String[] tokens = value.toLowerCase().split("\\W+");
-                       
-                       // emit the pairs
-                       for (String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
-                               }
-                       }
-               }
-       }
-       
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputTableName = 
HBaseFlinkTestConstants.TEST_TABLE_NAME;
-       
-       private static boolean parseParameters(String[] args) {
-               
-               if(args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if(args.length == 2) {
-                               textPath = args[0];
-                               outputTableName = args[1];
-                       } else {
-                               System.err.println("Usage: HBaseWriteExample 
<text path> <output table>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing HBaseWriteExample example 
with built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from a file.");
-                       System.out.println("  Usage: HBaseWriteExample <text 
path> <output table>");
-               }
-               return true;
-       }
-       
-       private static DataSet<String> getTextDataSet(ExecutionEnvironment env) 
{
-               if(fileOutput) {
-                       // read the text file from given input path
-                       return env.readTextFile(textPath);
-               } else {
-                       // get default test text data
-                       return getDefaultTextLineDataSet(env);
-               }
-       }
-       private static DataSet<String> 
getDefaultTextLineDataSet(ExecutionEnvironment env) {
-               return env.fromElements(WORDS);
-       }
-       private static final String[] WORDS = new String[] {
-               "To be, or not to be,--that is the question:--",
-               "Whether 'tis nobler in the mind to suffer",
-               "The slings and arrows of outrageous fortune",
-               "Or to take arms against a sea of troubles,",
-               "And by opposing end them?--To die,--to sleep,--",
-               "No more; and by a sleep to say we end",
-               "The heartache, and the thousand natural shocks",
-               "That flesh is heir to,--'tis a consummation",
-               "Devoutly to be wish'd. To die,--to sleep;--",
-               "To sleep! perchance to dream:--ay, there's the rub;",
-               "For in that sleep of death what dreams may come,",
-               "When we have shuffled off this mortal coil,",
-               "Must give us pause: there's the respect",
-               "That makes calamity of so long life;",
-               "For who would bear the whips and scorns of time,",
-               "The oppressor's wrong, the proud man's contumely,",
-               "The pangs of despis'd love, the law's delay,",
-               "The insolence of office, and the spurns",
-               "That patient merit of the unworthy takes,",
-               "When he himself might his quietus make",
-               "With a bare bodkin? who would these fardels bear,",
-               "To grunt and sweat under a weary life,",
-               "But that the dread of something after death,--",
-               "The undiscover'd country, from whose bourn",
-               "No traveller returns,--puzzles the will,",
-               "And makes us rather bear those ills we have",
-               "Than fly to others that we know not of?",
-               "Thus conscience does make cowards of us all;",
-               "And thus the native hue of resolution",
-               "Is sicklied o'er with the pale cast of thought;",
-               "And enterprises of great pith and moment,",
-               "With this regard, their currents turn awry,",
-               "And lose the name of action.--Soft you now!",
-               "The fair Ophelia!--Nymph, in thy orisons",
-               "Be all my sins remember'd."
-       };
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
deleted file mode 100644
index 05398db..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteStreamExample.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * 
- * This is an example how to write streams into HBase. In this example the
- * stream will be written into a local Hbase but it is possible to adapt this
- * example for an HBase running in a cloud. You need a running local HBase 
with a
- * table "flinkExample" and a column "entry". If your HBase configuration does
- * not fit the hbase-site.xml in the resource folder then you gave to delete 
temporary this
- * hbase-site.xml to execute the example properly.
- * 
- */
-public class HBaseWriteStreamExample {
-
-       public static void main(String[] args) throws Exception {
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment
-                               .getExecutionEnvironment();
-
-               // data stream with random numbers
-               DataStream<String> dataStream = env.addSource(new 
SourceFunction<String>() {
-                       private static final long serialVersionUID = 1L;
-
-                       private volatile boolean isRunning = true;
-
-                       @Override
-                       public void run(SourceContext<String> out) throws 
Exception {
-                               while (isRunning) {
-                                       
out.collect(String.valueOf(Math.floor(Math.random() * 100)));
-                               }
-
-                       }
-
-                       @Override
-                       public void cancel() {
-                               isRunning = false;
-                       }
-               });
-               dataStream.writeUsingOutputFormat(new HBaseOutputFormat());
-
-               env.execute();
-       }
-
-       /**
-        * 
-        * This class implements an OutputFormat for HBase
-        *
-        */
-       private static class HBaseOutputFormat implements OutputFormat<String> {
-
-               private org.apache.hadoop.conf.Configuration conf = null;
-               private HTable table = null;
-               private String taskNumber = null;
-               private int rowNumber = 0;
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void configure(Configuration parameters) {
-                       conf = HBaseConfiguration.create();
-               }
-
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {
-                       table = new HTable(conf, "flinkExample");
-                       this.taskNumber = String.valueOf(taskNumber);
-               }
-
-               @Override
-               public void writeRecord(String record) throws IOException {
-                       Put put = new Put(Bytes.toBytes(taskNumber + 
rowNumber));
-                       put.add(Bytes.toBytes("entry"), Bytes.toBytes("entry"),
-                                       Bytes.toBytes(rowNumber));
-                       rowNumber++;
-                       table.put(put);
-               }
-
-               @Override
-               public void close() throws IOException {
-                       table.flushCommits();
-                       table.close();
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties 
b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
deleted file mode 100644
index 804ff45..0000000
--- 
a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-# http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=DEBUG, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.threshold=INFO
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - 
%m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml 
b/flink-batch-connectors/flink-hcatalog/pom.xml
deleted file mode 100644
index 6889e5a..0000000
--- a/flink-batch-connectors/flink-hcatalog/pom.xml
+++ /dev/null
@@ -1,182 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-       
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-batch-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-hcatalog</artifactId>
-       <name>flink-hcatalog</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-hadoop-compatibility_2.10</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.hive.hcatalog</groupId>
-                       <artifactId>hcatalog-core</artifactId>
-                       <version>0.12.0</version>
-                       <exclusions>
-                               <exclusion>
-                                       <groupId>org.json</groupId>
-                                       <artifactId>json</artifactId>
-                               </exclusion>
-                       </exclusions>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.scala-lang</groupId>
-                       <artifactId>scala-library</artifactId>
-                       <scope>provided</scope>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <!-- Scala Compiler -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                               scala classes can be resolved 
later in the (Java) compile phase -->
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
-
-                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
-                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
-                                       <execution>
-                                               <id>scala-test-compile</id>
-                                               
<phase>process-test-resources</phase>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms128m</jvmArg>
-                                               <jvmArg>-Xmx512m</jvmArg>
-                                       </jvmArgs>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Eclipse Integration -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-eclipse-plugin</artifactId>
-                               <version>2.8</version>
-                               <configuration>
-                                       <downloadSources>true</downloadSources>
-                                       <projectnatures>
-                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-                                       </projectnatures>
-                                       <buildcommands>
-                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-                                       </buildcommands>
-                                       <classpathContainers>
-                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                                       </classpathContainers>
-                                       <excludes>
-                                               
<exclude>org.scala-lang:scala-library</exclude>
-                                               
<exclude>org.scala-lang:scala-compiler</exclude>
-                                       </excludes>
-                                       <sourceIncludes>
-                                               
<sourceInclude>**/*.scala</sourceInclude>
-                                               
<sourceInclude>**/*.java</sourceInclude>
-                                       </sourceIncludes>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Adding scala source directories to build path -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               
<artifactId>build-helper-maven-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <!-- Add src/main/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-source</id>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>add-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/main/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                                       <!-- Add src/test/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-test-source</id>
-                                               
<phase>generate-test-sources</phase>
-                                               <goals>
-                                                       
<goal>add-test-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/test/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <!-- Scala Code Style, most of the configuration done 
via plugin management -->
-                       <plugin>
-                               <groupId>org.scalastyle</groupId>
-                               <artifactId>scalastyle-maven-plugin</artifactId>
-                               <configuration>
-                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-                               </configuration>
-                       </plugin>
-
-               </plugins>
-       </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
 
b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
deleted file mode 100644
index 859b706..0000000
--- 
a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/HCatInputFormatBase.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog;
-
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.data.DefaultHCatRecord;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and 
partition filters.
- *
- * Data can be returned as {@link org.apache.hive.hcatalog.data.HCatRecord} or 
Flink-native tuple.
- *
- * Note: Flink tuples might only support a limited number of fields (depending 
on the API).
- *
- * @param <T>
- */
-public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, 
HadoopInputSplit> implements ResultTypeQueryable<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       private Configuration configuration;
-
-       private org.apache.hive.hcatalog.mapreduce.HCatInputFormat 
hCatInputFormat;
-       private RecordReader<WritableComparable, HCatRecord> recordReader;
-       private boolean fetched = false;
-       private boolean hasNext;
-
-       protected String[] fieldNames = new String[0];
-       protected HCatSchema outputSchema;
-
-       private TypeInformation<T> resultType;
-
-       public HCatInputFormatBase() { }
-
-       /**
-        * Creates a HCatInputFormat for the given database and table.
-        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
-        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
-        * {@link HCatInputFormatBase#asFlinkTuples()}.
-        *
-        * @param database The name of the database to read from.
-        * @param table The name of the table to read.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase(String database, String table) throws 
IOException {
-               this(database, table, new Configuration());
-       }
-
-       /**
-        * Creates a HCatInputFormat for the given database, table, and
-        * {@link org.apache.hadoop.conf.Configuration}.
-        * By default, the InputFormat returns {@link 
org.apache.hive.hcatalog.data.HCatRecord}.
-        * The return type of the InputFormat can be changed to Flink-native 
tuples by calling
-        * {@link HCatInputFormatBase#asFlinkTuples()}.
-        *
-        * @param database The name of the database to read from.
-        * @param table The name of the table to read.
-        * @param config The Configuration for the InputFormat.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase(String database, String table, Configuration 
config) throws IOException {
-               super();
-               this.configuration = config;
-               HadoopUtils.mergeHadoopConf(this.configuration);
-
-               this.hCatInputFormat = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(this.configuration, 
database, table);
-               this.outputSchema = 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.getTableSchema(this.configuration);
-
-               // configure output schema of HCatFormat
-               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
-               // set type information
-               this.resultType = new WritableTypeInfo(DefaultHCatRecord.class);
-       }
-
-       /**
-        * Specifies the fields which are returned by the InputFormat and their 
order.
-        *
-        * @param fields The fields and their order which are returned by the 
InputFormat.
-        * @return This InputFormat with specified return fields.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase<T> getFields(String... fields) throws 
IOException {
-
-               // build output schema
-               ArrayList<HCatFieldSchema> fieldSchemas = new 
ArrayList<HCatFieldSchema>(fields.length);
-               for(String field : fields) {
-                       fieldSchemas.add(this.outputSchema.get(field));
-               }
-               this.outputSchema = new HCatSchema(fieldSchemas);
-
-               // update output schema configuration
-               configuration.set("mapreduce.lib.hcat.output.schema", 
HCatUtil.serialize(outputSchema));
-
-               return this;
-       }
-
-       /**
-        * Specifies a SQL-like filter condition on the table's partition 
columns.
-        * Filter conditions on non-partition columns are invalid.
-        * A partition filter can significantly reduce the amount of data to be 
read.
-        *
-        * @param filter A SQL-like filter condition on the table's partition 
columns.
-        * @return This InputFormat with specified partition filter.
-        * @throws java.io.IOException
-        */
-       public HCatInputFormatBase<T> withFilter(String filter) throws 
IOException {
-
-               // set filter
-               this.hCatInputFormat.setFilter(filter);
-
-               return this;
-       }
-
-       /**
-        * Specifies that the InputFormat returns Flink tuples instead of
-        * {@link org.apache.hive.hcatalog.data.HCatRecord}.
-        *
-        * Note: Flink tuples might only support a limited number of fields 
(depending on the API).
-        *
-        * @return This InputFormat.
-        * @throws org.apache.hive.hcatalog.common.HCatException
-        */
-       public HCatInputFormatBase<T> asFlinkTuples() throws HCatException {
-
-               // build type information
-               int numFields = outputSchema.getFields().size();
-               if(numFields > this.getMaxFlinkTupleSize()) {
-                       throw new IllegalArgumentException("Only up to 
"+this.getMaxFlinkTupleSize()+
-                                       " fields can be returned as Flink 
tuples.");
-               }
-
-               TypeInformation[] fieldTypes = new TypeInformation[numFields];
-               fieldNames = new String[numFields];
-               for (String fieldName : outputSchema.getFieldNames()) {
-                       HCatFieldSchema field = outputSchema.get(fieldName);
-
-                       int fieldPos = outputSchema.getPosition(fieldName);
-                       TypeInformation fieldType = getFieldType(field);
-
-                       fieldTypes[fieldPos] = fieldType;
-                       fieldNames[fieldPos] = fieldName;
-
-               }
-               this.resultType = new TupleTypeInfo(fieldTypes);
-
-               return this;
-       }
-
-       protected abstract int getMaxFlinkTupleSize();
-
-       private TypeInformation getFieldType(HCatFieldSchema fieldSchema) {
-
-               switch(fieldSchema.getType()) {
-                       case INT:
-                               return BasicTypeInfo.INT_TYPE_INFO;
-                       case TINYINT:
-                               return BasicTypeInfo.BYTE_TYPE_INFO;
-                       case SMALLINT:
-                               return BasicTypeInfo.SHORT_TYPE_INFO;
-                       case BIGINT:
-                               return BasicTypeInfo.LONG_TYPE_INFO;
-                       case BOOLEAN:
-                               return BasicTypeInfo.BOOLEAN_TYPE_INFO;
-                       case FLOAT:
-                               return BasicTypeInfo.FLOAT_TYPE_INFO;
-                       case DOUBLE:
-                               return BasicTypeInfo.DOUBLE_TYPE_INFO;
-                       case STRING:
-                               return BasicTypeInfo.STRING_TYPE_INFO;
-                       case BINARY:
-                               return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
-                       case ARRAY:
-                               return new GenericTypeInfo(List.class);
-                       case MAP:
-                               return new GenericTypeInfo(Map.class);
-                       case STRUCT:
-                               return new GenericTypeInfo(List.class);
-                       default:
-                               throw new IllegalArgumentException("Unknown 
data type \""+fieldSchema.getType()+"\" encountered.");
-               }
-       }
-
-       /**
-        * Returns the {@link org.apache.hadoop.conf.Configuration} of the 
HCatInputFormat.
-        *
-        * @return The Configuration of the HCatInputFormat.
-        */
-       public Configuration getConfiguration() {
-               return this.configuration;
-       }
-
-       /**
-        * Returns the {@link org.apache.hive.hcatalog.data.schema.HCatSchema} 
of the {@link org.apache.hive.hcatalog.data.HCatRecord}
-        * returned by this InputFormat.
-        *
-        * @return The HCatSchema of the HCatRecords returned by this 
InputFormat.
-        */
-       public HCatSchema getOutputSchema() {
-               return this.outputSchema;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  InputFormat
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void configure(org.apache.flink.configuration.Configuration 
parameters) {
-               // nothing to do
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
-               // no statistics provided at the moment
-               return null;
-       }
-
-       @Override
-       public HadoopInputSplit[] createInputSplits(int minNumSplits)
-                       throws IOException {
-               
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", 
minNumSplits);
-
-               JobContext jobContext = null;
-               try {
-                       jobContext = 
HadoopUtils.instantiateJobContext(configuration, new JobID());
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               List<InputSplit> splits;
-               try {
-                       splits = this.hCatInputFormat.getSplits(jobContext);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not get Splits.", e);
-               }
-               HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
-
-               for(int i = 0; i < hadoopInputSplits.length; i++){
-                       hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
-               }
-               return hadoopInputSplits;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
-               return new LocatableInputSplitAssigner(inputSplits);
-       }
-
-       @Override
-       public void open(HadoopInputSplit split) throws IOException {
-               TaskAttemptContext context = null;
-               try {
-                       context = 
HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-               } catch(Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               try {
-                       this.recordReader = this.hCatInputFormat
-                                       
.createRecordReader(split.getHadoopInputSplit(), context);
-                       
this.recordReader.initialize(split.getHadoopInputSplit(), context);
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not create RecordReader.", 
e);
-               } finally {
-                       this.fetched = false;
-               }
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               if(!this.fetched) {
-                       fetchNext();
-               }
-               return !this.hasNext;
-       }
-
-       private void fetchNext() throws IOException {
-               try {
-                       this.hasNext = this.recordReader.nextKeyValue();
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not fetch next KeyValue 
pair.", e);
-               } finally {
-                       this.fetched = true;
-               }
-       }
-
-       @Override
-       public T nextRecord(T record) throws IOException {
-               if(!this.fetched) {
-                       // first record
-                       fetchNext();
-               }
-               if(!this.hasNext) {
-                       return null;
-               }
-               try {
-
-                       // get next HCatRecord
-                       HCatRecord v = this.recordReader.getCurrentValue();
-                       this.fetched = false;
-
-                       if(this.fieldNames.length > 0) {
-                               // return as Flink tuple
-                               return this.buildFlinkTuple(record, v);
-
-                       } else {
-                               // return as HCatRecord
-                               return (T)v;
-                       }
-
-               } catch (InterruptedException e) {
-                       throw new IOException("Could not get next record.", e);
-               }
-       }
-
-       protected abstract T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException;
-
-       @Override
-       public void close() throws IOException {
-               this.recordReader.close();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom de/serialization methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeInt(this.fieldNames.length);
-               for(String fieldName : this.fieldNames) {
-                       out.writeUTF(fieldName);
-               }
-               this.configuration.write(out);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               this.fieldNames = new String[in.readInt()];
-               for(int i=0; i<this.fieldNames.length; i++) {
-                       this.fieldNames[i] = in.readUTF();
-               }
-
-               Configuration configuration = new Configuration();
-               configuration.readFields(in);
-
-               if(this.configuration == null) {
-                       this.configuration = configuration;
-               }
-
-               this.hCatInputFormat = new 
org.apache.hive.hcatalog.mapreduce.HCatInputFormat();
-               this.outputSchema = 
(HCatSchema)HCatUtil.deserialize(this.configuration.get("mapreduce.lib.hcat.output.schema"));
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Result type business
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return this.resultType;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
 
b/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
deleted file mode 100644
index 46f3cd5..0000000
--- 
a/flink-batch-connectors/flink-hcatalog/src/main/java/org/apache/flink/hcatalog/java/HCatInputFormat.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.java;
-
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.hcatalog.HCatInputFormatBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.HCatRecord;
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and 
partition filters.
- *
- * Data can be returned as {@link HCatRecord} or Flink {@link 
org.apache.flink.api.java.tuple.Tuple}.
- * Flink tuples support only up to 25 fields.
- *
- * @param <T>
- */
-public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
-       private static final long serialVersionUID = 1L;
-
-       public HCatInputFormat() {}
-
-       public HCatInputFormat(String database, String table) throws Exception {
-               super(database, table);
-       }
-
-       public HCatInputFormat(String database, String table, Configuration 
config) throws Exception {
-               super(database, table, config);
-       }
-
-
-       @Override
-       protected int getMaxFlinkTupleSize() {
-               return 25;
-       }
-
-       @Override
-       protected T buildFlinkTuple(T t, HCatRecord record) throws 
HCatException {
-
-               Tuple tuple = (Tuple)t;
-
-               // Extract all fields from HCatRecord
-               for(int i=0; i < this.fieldNames.length; i++) {
-
-                       // get field value
-                       Object o = record.get(this.fieldNames[i], 
this.outputSchema);
-
-                       // Set field value in Flink tuple.
-                       // Partition columns are returned as String and
-                       //   need to be converted to original type.
-                       switch(this.outputSchema.get(i).getType()) {
-                               case INT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Integer.parseInt((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case TINYINT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Byte.parseByte((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case SMALLINT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Short.parseShort((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case BIGINT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Long.parseLong((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case BOOLEAN:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Boolean.parseBoolean((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case FLOAT:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Float.parseFloat((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case DOUBLE:
-                                       if(o instanceof String) {
-                                               
tuple.setField(Double.parseDouble((String) o), i);
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case STRING:
-                                       tuple.setField(o, i);
-                                       break;
-                               case BINARY:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type BINARY.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case ARRAY:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type ARRAY.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case MAP:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type MAP.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               case STRUCT:
-                                       if(o instanceof String) {
-                                               throw new 
RuntimeException("Cannot handle partition keys of type STRUCT.");
-                                       } else {
-                                               tuple.setField(o, i);
-                                       }
-                                       break;
-                               default:
-                                       throw new RuntimeException("Invalid 
Type");
-                       }
-               }
-
-               return (T)tuple;
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
 
b/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
deleted file mode 100644
index 0299ee1..0000000
--- 
a/flink-batch-connectors/flink-hcatalog/src/main/scala/org/apache/flink/hcatalog/scala/HCatInputFormat.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hcatalog.scala
-
-import org.apache.flink.configuration
-import org.apache.flink.hcatalog.HCatInputFormatBase
-import org.apache.hadoop.conf.Configuration
-import org.apache.hive.hcatalog.data.HCatRecord
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema
-
-/**
- * A InputFormat to read from HCatalog tables.
- * The InputFormat supports projection (selection and order of fields) and 
partition filters.
- *
- * Data can be returned as [[HCatRecord]] or Scala tuples.
- * Scala tuples support only up to 22 fields.
- *
- */
-class HCatInputFormat[T](
-                        database: String,
-                        table: String,
-                        config: Configuration
-                          ) extends HCatInputFormatBase[T](database, table, 
config) {
-
-  def this(database: String, table: String) {
-    this(database, table, new Configuration)
-  }
-
-  var vals: Array[Any] = Array[Any]()
-
-  override def configure(parameters: configuration.Configuration): Unit = {
-    super.configure(parameters)
-    vals = new Array[Any](fieldNames.length)
-  }
-
-  override protected def getMaxFlinkTupleSize: Int = 22
-
-  override protected def buildFlinkTuple(t: T, record: HCatRecord): T = {
-
-    // Extract all fields from HCatRecord
-    var i: Int = 0
-    while (i < this.fieldNames.length) {
-
-        val o: AnyRef = record.get(this.fieldNames(i), this.outputSchema)
-
-        // partition columns are returned as String
-        //   Check and convert to actual type.
-        this.outputSchema.get(i).getType match {
-          case HCatFieldSchema.Type.INT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt
-            }
-            else {
-              vals(i) = o.asInstanceOf[Int]
-            }
-          case HCatFieldSchema.Type.TINYINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt.toByte
-            }
-            else {
-              vals(i) = o.asInstanceOf[Byte]
-            }
-          case HCatFieldSchema.Type.SMALLINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toInt.toShort
-            }
-            else {
-              vals(i) = o.asInstanceOf[Short]
-            }
-          case HCatFieldSchema.Type.BIGINT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toLong
-            }
-            else {
-              vals(i) = o.asInstanceOf[Long]
-            }
-          case HCatFieldSchema.Type.BOOLEAN =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toBoolean
-            }
-            else {
-              vals(i) = o.asInstanceOf[Boolean]
-            }
-          case HCatFieldSchema.Type.FLOAT =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toFloat
-            }
-            else {
-              vals(i) = o.asInstanceOf[Float]
-            }
-          case HCatFieldSchema.Type.DOUBLE =>
-            if (o.isInstanceOf[String]) {
-              vals(i) = o.asInstanceOf[String].toDouble
-            }
-            else {
-              vals(i) = o.asInstanceOf[Double]
-            }
-          case HCatFieldSchema.Type.STRING =>
-            vals(i) = o
-          case HCatFieldSchema.Type.BINARY =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
BINARY.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[Array[Byte]]
-            }
-          case HCatFieldSchema.Type.ARRAY =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
ARRAY.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[List[Object]]
-            }
-          case HCatFieldSchema.Type.MAP =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
MAP.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[Map[Object, Object]]
-            }
-          case HCatFieldSchema.Type.STRUCT =>
-            if (o.isInstanceOf[String]) {
-              throw new RuntimeException("Cannot handle partition keys of type 
STRUCT.")
-            }
-            else {
-              vals(i) = o.asInstanceOf[List[Object]]
-            }
-          case _ =>
-            throw new RuntimeException("Invalid type " + 
this.outputSchema.get(i).getType +
-              " encountered.")
-        }
-
-        i += 1
-      }
-    createScalaTuple(vals)
-  }
-
-  private def createScalaTuple(vals: Array[Any]): T = {
-
-    this.fieldNames.length match {
-      case 1 =>
-        new Tuple1(vals(0)).asInstanceOf[T]
-      case 2 =>
-        new Tuple2(vals(0), vals(1)).asInstanceOf[T]
-      case 3 =>
-        new Tuple3(vals(0), vals(1), vals(2)).asInstanceOf[T]
-      case 4 =>
-        new Tuple4(vals(0), vals(1), vals(2), vals(3)).asInstanceOf[T]
-      case 5 =>
-        new Tuple5(vals(0), vals(1), vals(2), vals(3), vals(4)).asInstanceOf[T]
-      case 6 =>
-        new Tuple6(vals(0), vals(1), vals(2), vals(3), vals(4), 
vals(5)).asInstanceOf[T]
-      case 7 =>
-        new Tuple7(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6)).asInstanceOf[T]
-      case 8 =>
-        new Tuple8(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7))
-          .asInstanceOf[T]
-      case 9 =>
-        new Tuple9(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8)).asInstanceOf[T]
-      case 10 =>
-        new Tuple10(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9)).asInstanceOf[T]
-      case 11 =>
-        new Tuple11(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10)).asInstanceOf[T]
-      case 12 =>
-        new Tuple12(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11)).asInstanceOf[T]
-      case 13 =>
-        new Tuple13(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12)).asInstanceOf[T]
-      case 14 =>
-        new Tuple14(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), 
vals(13)).asInstanceOf[T]
-      case 15 =>
-        new Tuple15(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), 
vals(14)).asInstanceOf[T]
-      case 16 =>
-        new Tuple16(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15))
-          .asInstanceOf[T]
-      case 17 =>
-        new Tuple17(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16)).asInstanceOf[T]
-      case 18 =>
-        new Tuple18(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17)).asInstanceOf[T]
-      case 19 =>
-        new Tuple19(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18)).asInstanceOf[T]
-      case 20 =>
-        new Tuple20(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18), vals(19)).asInstanceOf[T]
-      case 21 =>
-        new Tuple21(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18), vals(19), vals(20)).asInstanceOf[T]
-      case 22 =>
-        new Tuple22(vals(0), vals(1), vals(2), vals(3), vals(4), vals(5), 
vals(6), vals(7),
-          vals(8), vals(9), vals(10), vals(11), vals(12), vals(13), vals(14), 
vals(15),
-          vals(16), vals(17), vals(18), vals(19), vals(20), 
vals(21)).asInstanceOf[T]
-      case _ =>
-        throw new RuntimeException("Only up to 22 fields supported for Scala 
Tuples.")
-
-  }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-batch-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/pom.xml 
b/flink-batch-connectors/flink-jdbc/pom.xml
deleted file mode 100644
index 40779ba..0000000
--- a/flink-batch-connectors/flink-jdbc/pom.xml
+++ /dev/null
@@ -1,66 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-       
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-batch-connectors</artifactId>
-               <version>1.2-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-jdbc</artifactId>
-       <name>flink-jdbc</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-clients_2.10</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.derby</groupId>
-                       <artifactId>derby</artifactId>
-                       <version>10.10.1.1</version>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-</project>

Reply via email to