Repository: flink
Updated Branches:
  refs/heads/master 9e17cbd6b -> a079259f3


[FLINK-4311] [hbase] Fix TableInputFormat to correctly process multiple input 
splits.

This closes #2330


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f872792
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f872792
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f872792

Branch: refs/heads/master
Commit: 3f8727921e944d1d89714f5885c2de63681d51b2
Parents: 9e17cbd
Author: Niels Basjes <[email protected]>
Authored: Wed Aug 3 14:54:34 2016 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Mon Oct 10 20:36:20 2016 +0200

----------------------------------------------------------------------
 flink-batch-connectors/flink-hbase/pom.xml      |  84 ++++++-
 .../flink/addons/hbase/TableInputFormat.java    | 171 +++++++------
 .../hbase/HBaseTestingClusterAutostarter.java   | 238 +++++++++++++++++++
 .../addons/hbase/TableInputFormatITCase.java    | 120 ++++++++++
 .../src/test/resources/hbase-site.xml           |  43 ----
 .../src/test/resources/log4j-test.properties    |  23 ++
 .../src/test/resources/log4j.properties         |  23 --
 .../api/common/io/ReplicatingInputFormat.java   |   4 +-
 .../flink/api/common/io/RichInputFormat.java    |   8 +-
 .../source/ContinuousFileReaderOperator.java    |   9 +-
 10 files changed, 579 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-hbase/pom.xml 
b/flink-batch-connectors/flink-hbase/pom.xml
index 10c20a0..62e77d6 100644
--- a/flink-batch-connectors/flink-hbase/pom.xml
+++ b/flink-batch-connectors/flink-hbase/pom.xml
@@ -35,9 +35,36 @@ under the License.
 
        <properties>
                <hbase.hadoop1.version>0.98.11-hadoop1</hbase.hadoop1.version>
-               <hbase.hadoop2.version>0.98.11-hadoop2</hbase.hadoop2.version>
+               <hbase.hadoop2.version>1.1.2</hbase.hadoop2.version>
        </properties>
 
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <version>2.19.1</version>
+                               <configuration>
+                                       <!-- Enforce single fork execution due 
to heavy mini cluster use in the tests -->
+                                       <forkCount>1</forkCount>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <!-- Disable inherited 
shade-flink because of a problem in the shade plugin -->
+                                               <!-- When enabled you'll run 
into an infinite loop creating the dependency-reduced-pom.xml -->
+                                               <!-- Seems similar to 
https://issues.apache.org/jira/browse/MSHADE-148 -->
+                                               <id>shade-flink</id>
+                                               <phase>none</phase>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
        <dependencies>
 
                <!-- core dependencies -->
@@ -48,26 +75,34 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-               
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-               
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>${shading-artifact.name}</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-               
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-streaming-java_2.10</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
+
+                       <!--Exclude Guava in order to run the HBaseMiniCluster 
during testing-->
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>com.google.guava</groupId>
+                                       <artifactId>guava</artifactId>
+                               </exclusion>
+                       </exclusions>
                </dependency>
 
                <!-- HBase server needed for TableOutputFormat -->
@@ -163,6 +198,45 @@ under the License.
                        </exclusions>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-server</artifactId>
+                       <version>${hbase.version}</version>
+                       <classifier>tests</classifier>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minicluster</artifactId>
+                       <version>${hadoop.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <version>${hadoop.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop-compat</artifactId>
+                       <version>${hbase.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hbase</groupId>
+                       <artifactId>hbase-hadoop2-compat</artifactId>
+                       <version>${hbase.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
        </dependencies>
 
        <profiles>
@@ -178,7 +252,7 @@ under the License.
                                
<hbase.version>${hbase.hadoop1.version}</hbase.version>
                        </properties>
                </profile>
-               
+
                <profile>
                        <id>hadoop-2</id>
                        <repositories>

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
 
b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 6ba6217..35b0a7c 100644
--- 
a/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ 
b/flink-batch-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -17,13 +17,9 @@
  */
 package org.apache.flink.addons.hbase;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
@@ -38,11 +34,14 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * {@link InputFormat} subclass that wraps the access for HTables.
- *
  */
-public abstract class TableInputFormat<T extends Tuple> extends 
RichInputFormat<T, TableInputSplit>{
+public abstract class TableInputFormat<T extends Tuple> extends 
RichInputFormat<T, TableInputSplit> {
 
        private static final long serialVersionUID = 1L;
 
@@ -51,34 +50,56 @@ public abstract class TableInputFormat<T extends Tuple> 
extends RichInputFormat<
        /** helper variable to decide whether the input is exhausted or not */
        private boolean endReached = false;
 
-       // TODO table and scan could be serialized when kryo serializer will be 
the default
-       protected transient HTable table;
-       protected transient Scan scan;
+       protected transient HTable table = null;
+       protected transient Scan scan = null;
 
        /** HBase iterator wrapper */
-       private ResultScanner rs;
+       private ResultScanner resultScanner = null;
 
        private byte[] lastRow;
        private int scannedRows;
 
-       // abstract methods allow for multiple table and scanners in the same 
job
+       /**
+        * Returns an instance of Scan that retrieves the required subset of 
records from the HBase table.
+        * @return The appropriate instance of Scan for this usecase.
+        */
        protected abstract Scan getScanner();
+
+       /**
+        * What table is to be read.
+        * Per instance of a TableInputFormat derivative only a single 
tablename is possible.
+        * @return The name of the table
+        */
        protected abstract String getTableName();
+
+       /**
+        * The output from HBase is always an instance of {@link Result}.
+        * This method is to copy the data in the Result instance into the 
required {@link Tuple}
+        * @param r The Result instance from HBase that needs to be converted
+        * @return The approriate instance of {@link Tuple} that contains the 
needed information.
+        */
        protected abstract T mapResultToTuple(Result r);
 
        /**
-        * creates a {@link Scan} object and a {@link HTable} connection
+        * Creates a {@link Scan} object and opens the {@link HTable} 
connection.
+        * These are opened here because they are needed in the 
createInputSplits
+        * which is called before the openInputFormat method.
+        * So the connection is opened in {@link #configure(Configuration)} and 
closed in {@link #closeInputFormat()}.
         *
-        * @param parameters
+        * @param parameters The configuration that is to be used
         * @see Configuration
         */
        @Override
        public void configure(Configuration parameters) {
-               this.table = createTable();
-               this.scan = getScanner();
+               table = createTable();
+               if (table != null) {
+                       scan = getScanner();
+               }
        }
 
-       /** Create an {@link HTable} instance and set it into this format */
+       /**
+        * Create an {@link HTable} instance and set it into this format
+        */
        private HTable createTable() {
                LOG.info("Initializing HBaseConfiguration");
                //use files found in the classpath
@@ -93,32 +114,51 @@ public abstract class TableInputFormat<T extends Tuple> 
extends RichInputFormat<
        }
 
        @Override
+       public void open(TableInputSplit split) throws IOException {
+               if (table == null) {
+                       throw new IOException("The HBase table has not been 
opened!");
+               }
+               if (scan == null) {
+                       throw new IOException("getScanner returned null");
+               }
+               if (split == null) {
+                       throw new IOException("Input split is null!");
+               }
+
+               logSplitInfo("opening", split);
+               scan.setStartRow(split.getStartRow());
+               lastRow = split.getEndRow();
+               scan.setStopRow(lastRow);
+
+               resultScanner = table.getScanner(scan);
+               endReached = false;
+               scannedRows = 0;
+       }
+
+       @Override
        public boolean reachedEnd() throws IOException {
-               return this.endReached;
+               return endReached;
        }
 
        @Override
        public T nextRecord(T reuse) throws IOException {
-               if (this.rs == null){
+               if (resultScanner == null) {
                        throw new IOException("No table result scanner 
provided!");
                }
-               try{
-                       Result res = this.rs.next();
-                       if (res != null){
+               try {
+                       Result res = resultScanner.next();
+                       if (res != null) {
                                scannedRows++;
                                lastRow = res.getRow();
                                return mapResultToTuple(res);
                        }
-               }catch (Exception e) {
-                       this.rs.close();
+               } catch (Exception e) {
+                       resultScanner.close();
                        //workaround for timeout on scan
-                       StringBuffer logMsg = new StringBuffer("Error after 
scan of ")
-                                       .append(scannedRows)
-                                       .append(" rows. Retry with a new 
scanner...");
-                       LOG.warn(logMsg.toString(), e);
-                       this.scan.setStartRow(lastRow);
-                       this.rs = table.getScanner(scan);
-                       Result res = this.rs.next();
+                       LOG.warn("Error after scan of " + scannedRows + " rows. 
Retry with a new scanner...", e);
+                       scan.setStartRow(lastRow);
+                       resultScanner = table.getScanner(scan);
+                       Result res = resultScanner.next();
                        if (res != null) {
                                scannedRows++;
                                lastRow = res.getRow();
@@ -126,46 +166,43 @@ public abstract class TableInputFormat<T extends Tuple> 
extends RichInputFormat<
                        }
                }
 
-               this.endReached = true;
+               endReached = true;
                return null;
        }
 
        @Override
-       public void open(TableInputSplit split) throws IOException {
-               if (split == null){
-                       throw new IOException("Input split is null!");
-               }
-               if (table == null){
-                       throw new IOException("No HTable provided!");
-               }
-               if (scan == null){
-                       throw new IOException("No Scan instance provided");
+       public void close() throws IOException {
+               LOG.info("Closing split (scanned {} rows)", scannedRows);
+               lastRow = null;
+               try {
+                       if (resultScanner != null) {
+                               resultScanner.close();
+                       }
+               } finally {
+                       resultScanner = null;
                }
-
-               logSplitInfo("opening", split);
-               scan.setStartRow(split.getStartRow());
-               lastRow = split.getEndRow();
-               scan.setStopRow(lastRow);
-
-               this.rs = table.getScanner(scan);
-               this.endReached = false;
-               this.scannedRows = 0;
        }
 
        @Override
-       public void close() throws IOException {
-               if(rs!=null){
-                       this.rs.close();
-               }
-               if(table!=null){
-                       this.table.close();
+       public void closeInputFormat() throws IOException {
+               try {
+                       if (table != null) {
+                               table.close();
+                       }
+               } finally {
+                       table = null;
                }
-               LOG.info("Closing split (scanned {} rows)", scannedRows);
-               this.lastRow = null;
        }
 
        @Override
        public TableInputSplit[] createInputSplits(final int minNumSplits) 
throws IOException {
+               if (table == null) {
+                       throw new IOException("The HBase table has not been 
opened!");
+               }
+               if (scan == null) {
+                       throw new IOException("getScanner returned null");
+               }
+
                //Gets the starting and ending row keys for every region in the 
currently open table
                final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
                if (keys == null || keys.getFirst() == null || 
keys.getFirst().length == 0) {
@@ -186,16 +223,16 @@ public abstract class TableInputFormat<T extends Tuple> 
extends RichInputFormat<
                                continue;
                        }
                        //Finds the region on which the given row is being 
served
-                       final String[] hosts = new String[] { regionLocation };
+                       final String[] hosts = new String[]{regionLocation};
 
                        // determine if regions contains keys used by the scan
                        boolean isLastRegion = endKey.length == 0;
                        if ((scanWithNoLowerBound || isLastRegion || 
Bytes.compareTo(startRow, endKey) < 0) &&
-                                       (scanWithNoUpperBound || 
Bytes.compareTo(stopRow, startKey) > 0)) {
+                               (scanWithNoUpperBound || 
Bytes.compareTo(stopRow, startKey) > 0)) {
 
                                final byte[] splitStart = scanWithNoLowerBound 
|| Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
                                final byte[] splitStop = (scanWithNoUpperBound 
|| Bytes.compareTo(endKey, stopRow) <= 0)
-                                               && !isLastRegion ? endKey : 
stopRow;
+                                       && !isLastRegion ? endKey : stopRow;
                                int id = splits.size();
                                final TableInputSplit split = new 
TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
                                splits.add(split);
@@ -215,7 +252,7 @@ public abstract class TableInputFormat<T extends Tuple> 
extends RichInputFormat<
                String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
                String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
                String[] hostnames = split.getHostnames();
-               LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, 
splitStartKey, splitStopKey);
+               LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, 
splitId, hostnames, splitStartKey, splitStopKey);
        }
 
        /**
@@ -231,13 +268,11 @@ public abstract class TableInputFormat<T extends Tuple> 
extends RichInputFormat<
         * Override this method, if you want to bulk exclude regions altogether 
from M-R. By default, no region is excluded(
         * i.e. all regions are included).
         *
-        * @param startKey
-        *        Start key of the region
-        * @param endKey
-        *        End key of the region
+        * @param startKey Start key of the region
+        * @param endKey   End key of the region
         * @return true, if this region needs to be included as part of the 
input (default).
         */
-       private static boolean includeRegionInSplit(final byte[] startKey, 
final byte[] endKey) {
+       protected boolean includeRegionInSplit(final byte[] startKey, final 
byte[] endKey) {
                return true;
        }
 
@@ -251,4 +286,4 @@ public abstract class TableInputFormat<T extends Tuple> 
extends RichInputFormat<
                return null;
        }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
new file mode 100644
index 0000000..3d9f672
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * By using this class as the super class of a set of tests you will have a 
HBase testing
+ * cluster available that is very suitable for writing tests for scanning and 
filtering against.
+ * This is usable by any downstream application because the HBase cluster is 
'injected' because
+ * a dynamically generated hbase-site.xml is added to the classpath.
+ * Because of this classpath manipulation it is not possible to start a second 
testing cluster in the same JVM.
+ * So if you have this you should either put all hbase related tests in a 
single class or force surefire to
+ * setup a new JVM for each testclass.
+ * See: 
http://maven.apache.org/surefire/maven-surefire-plugin/examples/fork-options-and-parallel-execution.html
+ */
+//
+// NOTE: The code in this file is based on code from the
+// Apache HBase project, licensed under the Apache License v 2.0
+//
+// 
https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
+//
+public class HBaseTestingClusterAutostarter implements Serializable {
+
+       private static final Log LOG = 
LogFactory.getLog(HBaseTestingClusterAutostarter.class);
+
+       private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+       private static HBaseAdmin admin = null;
+       private static List<TableName> createdTables = new ArrayList<>();
+
+       private static boolean alreadyRegisteredTestCluster = false;
+
+       protected static void createTable(TableName tableName, byte[] 
columnFamilyName, byte[][] splitKeys) {
+               LOG.info("HBase minicluster: Creating table " + 
tableName.getNameAsString());
+
+               assertNotNull("HBaseAdmin is not initialized successfully.", 
admin);
+               HTableDescriptor desc = new HTableDescriptor(tableName);
+               HColumnDescriptor colDef = new 
HColumnDescriptor(columnFamilyName);
+               desc.addFamily(colDef);
+
+               try {
+                       admin.createTable(desc, splitKeys);
+                       createdTables.add(tableName);
+                       assertTrue("Fail to create the table", 
admin.tableExists(tableName));
+               } catch (IOException e) {
+                       assertNull("Exception found while creating table", e);
+               }
+       }
+
+       protected static HTable openTable(TableName tableName) throws 
IOException {
+               HTable table = (HTable) 
admin.getConnection().getTable(tableName);
+               assertTrue("Fail to create the table", 
admin.tableExists(tableName));
+               return table;
+       }
+
+       private static void deleteTables() {
+               if (admin != null) {
+                       for (TableName tableName : createdTables) {
+                               try {
+                                       if (admin.tableExists(tableName)) {
+                                               admin.disableTable(tableName);
+                                               admin.deleteTable(tableName);
+                                       }
+                               } catch (IOException e) {
+                                       assertNull("Exception found deleting 
the table", e);
+                               }
+                       }
+               }
+       }
+
+       private static void initialize(Configuration conf) {
+               conf = HBaseConfiguration.create(conf);
+               conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+               try {
+                       admin = TEST_UTIL.getHBaseAdmin();
+               } catch (MasterNotRunningException e) {
+                       assertNull("Master is not running", e);
+               } catch (ZooKeeperConnectionException e) {
+                       assertNull("Cannot connect to ZooKeeper", e);
+               } catch (IOException e) {
+                       assertNull("IOException", e);
+               }
+       }
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               LOG.info("HBase minicluster: Starting");
+               ((Log4JLogger) RpcServer.LOG).getLogger().setLevel(Level.ALL);
+               ((Log4JLogger) 
AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
+               ((Log4JLogger) 
ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
+
+               TEST_UTIL.startMiniCluster(1);
+
+               // https://issues.apache.org/jira/browse/HBASE-11711
+               TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 
-1);
+
+               // Make sure the zookeeper quorum value contains the right port 
number (varies per run).
+               TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", 
"localhost:" + TEST_UTIL.getZkCluster().getClientPort());
+
+               initialize(TEST_UTIL.getConfiguration());
+               LOG.info("HBase minicluster: Running");
+       }
+
+       private static File hbaseSiteXmlDirectory;
+       private static File hbaseSiteXmlFile;
+
+       /**
+        * This dynamically generates a hbase-site.xml file that is added to 
the classpath.
+        * This way this HBaseMinicluster can be used by an unmodified 
application.
+        * The downside is that this cannot be 'unloaded' so you can have only 
one per JVM.
+        */
+       public static void registerHBaseMiniClusterInClasspath() {
+               if (alreadyRegisteredTestCluster) {
+                       fail("You CANNOT register a second HBase Testing 
cluster in the classpath of the SAME JVM");
+               }
+               File baseDir = new File(System.getProperty("java.io.tmpdir", 
"/tmp/"));
+               hbaseSiteXmlDirectory = new File(baseDir, 
"unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
+
+               if (!hbaseSiteXmlDirectory.mkdirs()) {
+                       fail("Unable to create output directory " + 
hbaseSiteXmlDirectory + " for the HBase minicluster");
+               }
+
+               assertNotNull("The ZooKeeper for the HBase minicluster is 
missing", TEST_UTIL.getZkCluster());
+
+               createHBaseSiteXml(hbaseSiteXmlDirectory, 
TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
+               addDirectoryToClassPath(hbaseSiteXmlDirectory);
+
+               // Avoid starting it again.
+               alreadyRegisteredTestCluster = true;
+       }
+
+       private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, 
String zookeeperQuorum) {
+               hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, 
"hbase-site.xml");
+               // Create the hbase-site.xml file for this run.
+               try {
+                       String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
+                               "<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>\n" +
+                               "<configuration>\n" +
+                               "  <property>\n" +
+                               "    <name>hbase.zookeeper.quorum</name>\n" +
+                               "    <value>" + zookeeperQuorum + "</value>\n" +
+                               "  </property>\n" +
+                               "</configuration>";
+                       OutputStream fos = new 
FileOutputStream(hbaseSiteXmlFile);
+                       
fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
+                       fos.close();
+               } catch (IOException e) {
+                       fail("Unable to create " + hbaseSiteXmlFile);
+               }
+       }
+
+       private static void addDirectoryToClassPath(File directory) {
+               try {
+                       // Get the classloader actually used by 
HBaseConfiguration
+                       ClassLoader classLoader = 
HBaseConfiguration.create().getClassLoader();
+                       if (!(classLoader instanceof URLClassLoader)) {
+                               fail("We should get a URLClassLoader");
+                       }
+
+                       // Make the addURL method accessible
+                       Method method = 
URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+                       method.setAccessible(true);
+
+                       // Add the directory where we put the hbase-site.xml to 
the classpath
+                       method.invoke(classLoader, directory.toURI().toURL());
+               } catch (MalformedURLException | NoSuchMethodException | 
IllegalAccessException | InvocationTargetException e) {
+                       fail("Unable to add " + directory + " to classpath 
because of this exception: " + e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               LOG.info("HBase minicluster: Shutting down");
+               deleteTables();
+               hbaseSiteXmlFile.delete();
+               hbaseSiteXmlDirectory.delete();
+               TEST_UTIL.shutdownMiniCluster();
+               LOG.info("HBase minicluster: Down");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
new file mode 100644
index 0000000..3dddd88
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TableInputFormatITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TableInputFormatITCase extends HBaseTestingClusterAutostarter {
+       private static final String TEST_TABLE_NAME = 
"TableInputFormatTestTable";
+       private static final byte[] TEST_TABLE_FAMILY_NAME = "F".getBytes();
+       private static final byte[] TEST_TABLE_COLUMN_NAME = "Col".getBytes();
+
+       // These are the row ids AND also the values we will put in the test 
table
+       private static final String[] ROW_IDS = {"000", "111", "222", "333", 
"444", "555", "666", "777", "888", "999"};
+
+       @BeforeClass
+       public static void activateHBaseCluster(){
+               registerHBaseMiniClusterInClasspath();
+       }
+
+       @Before
+       public void createTestTable() throws IOException {
+               TableName tableName = TableName.valueOf(TEST_TABLE_NAME);
+               byte[][] splitKeys = {"0".getBytes(), "3".getBytes(), 
"6".getBytes(), "9".getBytes()};
+               createTable(tableName, TEST_TABLE_FAMILY_NAME, splitKeys);
+               HTable table = openTable(tableName);
+
+               for (String rowId : ROW_IDS) {
+                       byte[] rowIdBytes = rowId.getBytes();
+                       Put p = new Put(rowIdBytes);
+                       // Use the rowId as the value to facilitate the testing 
better
+                       p.add(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME, 
rowIdBytes);
+                       table.put(p);
+               }
+
+               table.close();
+       }
+
+       class InputFormatForTestTable extends TableInputFormat<Tuple1<String>> {
+               @Override
+               protected Scan getScanner() {
+                       return new Scan();
+               }
+
+               @Override
+               protected String getTableName() {
+                       return TEST_TABLE_NAME;
+               }
+
+               @Override
+               protected Tuple1<String> mapResultToTuple(Result r) {
+                       return new Tuple1<>(new 
String(r.getValue(TEST_TABLE_FAMILY_NAME, TEST_TABLE_COLUMN_NAME)));
+               }
+       }
+
+       @Test
+       public void testTableInputFormat() {
+               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+               environment.setParallelism(1);
+
+               DataSet<String> resultDataSet =
+                       environment.createInput(new 
InputFormatForTestTable()).map(new MapFunction<Tuple1<String>, String>() {
+                               @Override
+                               public String map(Tuple1<String> value) throws 
Exception {
+                                       return value.f0;
+                               }
+                       });
+
+               List<String> resultSet = new ArrayList<>();
+               resultDataSet.output(new 
LocalCollectionOutputFormat<>(resultSet));
+
+               try {
+                       environment.execute("HBase InputFormat Test");
+               } catch (Exception e) {
+                       Assert.fail("HBase InputFormat test failed. " + 
e.getMessage());
+               }
+
+               for (String rowId : ROW_IDS) {
+                       assertTrue("Missing rowId from table: " + rowId, 
resultSet.contains(rowId));
+               }
+
+               assertEquals("The number of records is wrong.", ROW_IDS.length, 
resultSet.size());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml 
b/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml
deleted file mode 100644
index 2984063..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/resources/hbase-site.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--->
-<configuration>
-
-  <property>
-    <name>hbase.tmp.dir</name>
-    <!-- 
-    <value>/media/Dati/hbase-0.98-data</value>
-    --> 
-    <value>/opt/hbase-0.98.6.1-hadoop2/data</value>
-
-  </property>
-  <property>
-    <name>hbase.zookeeper.quorum</name>
-    <value>localhost</value>
-  </property>
-    <!-- 
-  <property>
-    <name>hadoop.security.group.mapping</name>
-    <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
-  </property>
-  -->
-</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties 
b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..804ff45
--- /dev/null
+++ 
b/flink-batch-connectors/flink-hbase/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=DEBUG, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.threshold=INFO
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %-5p %30c{1}:%4L - 
%m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties 
b/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties
deleted file mode 100644
index d6eb2b2..0000000
--- a/flink-batch-connectors/flink-hbase/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-# http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=${hadoop.root.logger}
-hadoop.root.logger=INFO,console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
index 14dc8f4..c8aa591 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -135,14 +135,14 @@ public final class ReplicatingInputFormat<OT, S extends 
InputSplit> extends Rich
        }
 
        @Override
-       public void openInputFormat() {
+       public void openInputFormat() throws IOException {
                if (this.replicatedIF instanceof RichInputFormat) {
                        ((RichInputFormat)this.replicatedIF).openInputFormat();
                }
        }
 
        @Override
-       public void closeInputFormat() {
+       public void closeInputFormat() throws IOException {
                if (this.replicatedIF instanceof RichInputFormat) {
                        ((RichInputFormat)this.replicatedIF).closeInputFormat();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
index 6ab5cc1..95dc1cd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.io.InputSplit;
 
+import java.io.IOException;
+
 /**
  * An abstract stub implementation for Rich input formats.
  * Rich formats have access to their runtime execution context via {@link 
#getRuntimeContext()}.
@@ -56,9 +58,10 @@ public abstract class RichInputFormat<OT, T extends 
InputSplit> implements Input
         * Resources should be allocated in this method. (e.g. database 
connections, cache, etc.)
         * 
         * @see InputFormat
+        * @throws IOException in case allocating the resources failed.
         */
        @PublicEvolving
-       public void openInputFormat() {
+       public void openInputFormat() throws IOException {
                //do nothing here, just for subclasses
        }
 
@@ -67,9 +70,10 @@ public abstract class RichInputFormat<OT, T extends 
InputSplit> implements Input
         * Resources allocated during {@link #openInputFormat()} should be 
closed in this method.
         * 
         * @see InputFormat
+        * @throws IOException in case closing the resources failed
         */
        @PublicEvolving
-       public void closeInputFormat() {
+       public void closeInputFormat() throws IOException {
                //do nothing here, just for subclasses
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f872792/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 35e72a7..769cb6f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -326,12 +326,19 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                }
 
                        } catch (Throwable e) {
+
                                
getContainingTask().handleAsyncException("Caught exception when processing 
split: " + currentSplit, e);
+
                        } finally {
                                synchronized (checkpointLock) {
                                        LOG.info("Reader terminated, and 
exiting...");
 
-                                       this.format.closeInputFormat();
+                                       try {
+                                               this.format.closeInputFormat();
+                                       } catch (IOException e) {
+                                               
getContainingTask().handleAsyncException(
+                                                       "Caught exception from 
" + this.format.getClass().getName() + ".closeInputFormat() : " + 
e.getMessage(), e);
+                                       }
                                        this.isSplitOpen = false;
                                        this.currentSplit = null;
                                        this.isRunning = false;

Reply via email to