This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5d981ebc16ce6a1089e021fcb7a634ebe0167be5
Author: Jark Wu <[email protected]>
AuthorDate: Tue Jul 30 18:17:39 2019 +0800

    [FLINK-13290][hbase] Enable blink planner for integration tests of HBase
    
    This commit combines HBaseTableSourceITCase and HBaseLookupFunctionITCase 
and HBaseConnectorITCase into one class.
    This can save much cluster initialization time for us.
    
    This closes #9275
---
 flink-connectors/flink-hbase/pom.xml               |  27 +-
 .../flink/addons/hbase/HBaseTableSource.java       |   5 +-
 .../flink/addons/hbase/HBaseConnectorITCase.java   | 601 +++++++++++++--------
 .../addons/hbase/HBaseLookupFunctionITCase.java    | 203 -------
 .../apache/flink/addons/hbase/HBaseSinkITCase.java | 186 -------
 .../flink/addons/hbase/util/HBaseTestBase.java     | 150 +++++
 .../HBaseTestingClusterAutoStarter.java}           |  11 +-
 .../flink/addons/hbase/util/PlannerType.java       |  27 +
 8 files changed, 571 insertions(+), 639 deletions(-)

diff --git a/flink-connectors/flink-hbase/pom.xml 
b/flink-connectors/flink-hbase/pom.xml
index a9d3c22..eba6028 100644
--- a/flink-connectors/flink-hbase/pom.xml
+++ b/flink-connectors/flink-hbase/pom.xml
@@ -99,14 +99,6 @@ under the License.
                        <scope>provided</scope>
                        <optional>true</optional>
                </dependency>
-               <!-- A planner dependency won't be necessary once FLIP-32 has 
been completed. -->
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-                       <optional>true</optional>
-               </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -299,6 +291,25 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
diff --git 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index 3fff5fa..b1e7161 100644
--- 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -180,7 +180,10 @@ public class HBaseTableSource implements 
BatchTableSource<Row>, ProjectableTable
 
        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment 
execEnv) {
-               throw new UnsupportedOperationException("HBase table can not 
convert to DataStream currently.");
+               HBaseTableSchema projectedSchema = 
hbaseSchema.getProjectedHBaseTableSchema(projectFields);
+               return execEnv
+                       .createInput(new HBaseRowInputFormat(conf, tableName, 
projectedSchema), getReturnType())
+                       .name(explainSource());
        }
 
        @VisibleForTesting
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index bd992c5..6a0909e 100644
--- 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -1,6 +1,4 @@
 /*
- * Copyright The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,212 +18,138 @@
 
 package org.apache.flink.addons.hbase;
 
+import org.apache.flink.addons.hbase.util.HBaseTestBase;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-
+import java.util.Map;
+
+import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.junit.Assert.assertEquals;
 
 /**
- * This class contains integrations tests for multiple HBase connectors:
- * - TableInputFormat
- * - HBaseTableSource
- *
- * <p>These tests are located in a single test file to avoided unnecessary 
initializations of the
- * HBaseTestingCluster which takes about half a minute.
- *
+ * IT cases for HBase connector (including HBaseTableSource and 
HBaseTableSink).
  */
-public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
-
-       private static final String TEST_TABLE = "testTable";
-
-       private static final String FAMILY1 = "family1";
-       private static final String F1COL1 = "col1";
-
-       private static final String FAMILY2 = "family2";
-       private static final String F2COL1 = "col1";
-       private static final String F2COL2 = "col2";
-
-       private static final String FAMILY3 = "family3";
-       private static final String F3COL1 = "col1";
-       private static final String F3COL2 = "col2";
-       private static final String F3COL3 = "col3";
-
-       @BeforeClass
-       public static void activateHBaseCluster() throws IOException {
-               registerHBaseMiniClusterInClasspath();
-               prepareTable();
-               LimitNetworkBuffersTestEnvironment.setAsContext();
-       }
-
-       @AfterClass
-       public static void resetExecutionEnvironmentFactory() {
-               LimitNetworkBuffersTestEnvironment.unsetAsContext();
-       }
-
-       private static void prepareTable() throws IOException {
-
-               // create a table
-               TableName tableName = TableName.valueOf(TEST_TABLE);
-               // column families
-               byte[][] families = new byte[][]{
-                       Bytes.toBytes(FAMILY1),
-                       Bytes.toBytes(FAMILY2),
-                       Bytes.toBytes(FAMILY3)
-               };
-               // split keys
-               byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
-               createTable(tableName, families, splitKeys);
-
-               // get the HTable instance
-               HTable table = openTable(tableName);
-               List<Put> puts = new ArrayList<>();
-               // add some data
-               puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-               puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-               puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-               puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
-               puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-               puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-               puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-               puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
-
-               // append rows to table
-               table.put(puts);
-               table.close();
-       }
-
-       private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, 
double f3c1, boolean f3c2, String f3c3) {
-               Put put = new Put(Bytes.toBytes(rowKey));
-               // family 1
-               put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), 
Bytes.toBytes(f1c1));
-               // family 2
-               if (f2c1 != null) {
-                       put.addColumn(Bytes.toBytes(FAMILY2), 
Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
-               }
-               put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), 
Bytes.toBytes(f2c2));
-               // family 3
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), 
Bytes.toBytes(f3c1));
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), 
Bytes.toBytes(f3c2));
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), 
Bytes.toBytes(f3c3));
-
-               return put;
-       }
+public class HBaseConnectorITCase extends HBaseTestBase {
 
-       // ######## HBaseTableSource tests ############
+       // 
-------------------------------------------------------------------------------------
+       // HBaseTableSource tests
+       // 
-------------------------------------------------------------------------------------
 
        @Test
        public void testTableSourceFullScan() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               TableEnvironment tEnv = createBatchTableEnv();
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
                hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
                hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
                hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-               tableEnv.registerTableSource("hTable", hbaseTable);
-
-               Table result = tableEnv.sqlQuery(
-                       "SELECT " +
-                               "  h.family1.col1, " +
-                               "  h.family2.col1, " +
-                               "  h.family2.col2, " +
-                               "  h.family3.col1, " +
-                               "  h.family3.col2, " +
-                               "  h.family3.col3 " +
-                               "FROM hTable AS h"
-               );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-
+               tEnv.registerTableSource("hTable", hbaseTable);
+
+               Table table = tEnv.sqlQuery("SELECT " +
+                       "  h.family1.col1, " +
+                       "  h.family2.col1, " +
+                       "  h.family2.col2, " +
+                       "  h.family3.col1, " +
+                       "  h.family3.col2, " +
+                       "  h.family3.col3 " +
+                       "FROM hTable AS h");
+
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "10,Hello-1,100,1.01,false,Welt-1\n" +
-                       "20,Hello-2,200,2.02,true,Welt-2\n" +
-                       "30,Hello-3,300,3.03,false,Welt-3\n" +
-                       "40,null,400,4.04,true,Welt-4\n" +
-                       "50,Hello-5,500,5.05,false,Welt-5\n" +
-                       "60,Hello-6,600,6.06,true,Welt-6\n" +
-                       "70,Hello-7,700,7.07,false,Welt-7\n" +
-                       "80,null,800,8.08,true,Welt-8\n";
+                               "20,Hello-2,200,2.02,true,Welt-2\n" +
+                               "30,Hello-3,300,3.03,false,Welt-3\n" +
+                               "40,null,400,4.04,true,Welt-4\n" +
+                               "50,Hello-5,500,5.05,false,Welt-5\n" +
+                               "60,Hello-6,600,6.06,true,Welt-6\n" +
+                               "70,Hello-7,700,7.07,false,Welt-7\n" +
+                               "80,null,800,8.08,true,Welt-8\n";
 
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
        @Test
        public void testTableSourceProjection() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               TableEnvironment tEnv = createBatchTableEnv();
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
                hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
                hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
                hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-               tableEnv.registerTableSource("hTable", hbaseTable);
+               tEnv.registerTableSource("hTable", hbaseTable);
 
-               Table result = tableEnv.sqlQuery(
-                       "SELECT " +
-                               "  h.family1.col1, " +
-                               "  h.family3.col1, " +
-                               "  h.family3.col2, " +
-                               "  h.family3.col3 " +
-                               "FROM hTable AS h"
-               );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
+               Table table = tEnv.sqlQuery("SELECT " +
+                       "  h.family1.col1, " +
+                       "  h.family3.col1, " +
+                       "  h.family3.col2, " +
+                       "  h.family3.col3 " +
+                       "FROM hTable AS h");
 
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "10,1.01,false,Welt-1\n" +
-                       "20,2.02,true,Welt-2\n" +
-                       "30,3.03,false,Welt-3\n" +
-                       "40,4.04,true,Welt-4\n" +
-                       "50,5.05,false,Welt-5\n" +
-                       "60,6.06,true,Welt-6\n" +
-                       "70,7.07,false,Welt-7\n" +
-                       "80,8.08,true,Welt-8\n";
+                               "20,2.02,true,Welt-2\n" +
+                               "30,3.03,false,Welt-3\n" +
+                               "40,4.04,true,Welt-4\n" +
+                               "50,5.05,false,Welt-5\n" +
+                               "60,6.06,true,Welt-6\n" +
+                               "70,7.07,false,Welt-7\n" +
+                               "80,8.08,true,Welt-8\n";
 
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
        @Test
        public void testTableSourceFieldOrder() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               TableEnvironment tEnv = createBatchTableEnv();
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                // shuffle order of column registration
                hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
                hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
@@ -233,68 +157,319 @@ public class HBaseConnectorITCase extends 
HBaseTestingClusterAutostarter {
                hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
                hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-               tableEnv.registerTableSource("hTable", hbaseTable);
+               tEnv.registerTableSource("hTable", hbaseTable);
 
-               Table result = tableEnv.sqlQuery(
-                       "SELECT * FROM hTable AS h"
-               );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
+               Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
 
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "Hello-1,100,1.01,false,Welt-1,10\n" +
-                       "Hello-2,200,2.02,true,Welt-2,20\n" +
-                       "Hello-3,300,3.03,false,Welt-3,30\n" +
-                       "null,400,4.04,true,Welt-4,40\n" +
-                       "Hello-5,500,5.05,false,Welt-5,50\n" +
-                       "Hello-6,600,6.06,true,Welt-6,60\n" +
-                       "Hello-7,700,7.07,false,Welt-7,70\n" +
-                       "null,800,8.08,true,Welt-8,80\n";
+                               "Hello-2,200,2.02,true,Welt-2,20\n" +
+                               "Hello-3,300,3.03,false,Welt-3,30\n" +
+                               "null,400,4.04,true,Welt-4,40\n" +
+                               "Hello-5,500,5.05,false,Welt-5,50\n" +
+                               "Hello-6,600,6.06,true,Welt-6,60\n" +
+                               "Hello-7,700,7.07,false,Welt-7,70\n" +
+                               "null,800,8.08,true,Welt-8,80\n";
 
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
        @Test
        public void testTableSourceReadAsByteArray() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
+               TableEnvironment tEnv = createBatchTableEnv();
                // fetch row2 from the table till the end
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
                hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
+               tEnv.registerTableSource("hTable", hbaseTable);
+               tEnv.registerFunction("toUTF8", new ToUTF8());
+               tEnv.registerFunction("toLong", new ToLong());
 
-               tableEnv.registerTableSource("hTable", hbaseTable);
-               tableEnv.registerFunction("toUTF8", new ToUTF8());
-               tableEnv.registerFunction("toLong", new ToLong());
-
-               Table result = tableEnv.sqlQuery(
+               Table table = tEnv.sqlQuery(
                        "SELECT " +
                                "  toUTF8(h.family2.col1), " +
                                "  toLong(h.family2.col2) " +
                                "FROM hTable AS h"
                );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
 
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "Hello-1,100\n" +
-                       "Hello-2,200\n" +
-                       "Hello-3,300\n" +
-                       "null,400\n" +
-                       "Hello-5,500\n" +
-                       "Hello-6,600\n" +
-                       "Hello-7,700\n" +
-                       "null,800\n";
+                               "Hello-2,200\n" +
+                               "Hello-3,300\n" +
+                               "null,400\n" +
+                               "Hello-5,500\n" +
+                               "Hello-6,600\n" +
+                               "Hello-7,700\n" +
+                               "null,800\n";
 
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
+       @Test
+       public void testTableInputFormat() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Tuple1<Integer>> result = env
+                       .createInput(new InputFormatForTestTable())
+                       .reduce((ReduceFunction<Tuple1<Integer>>) (v1, v2) -> 
Tuple1.of(v1.f0 + v2.f0));
+
+               List<Tuple1<Integer>> resultSet = result.collect();
+
+               assertEquals(1, resultSet.size());
+               assertEquals(360, (int) resultSet.get(0).f0);
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // HBaseTableSink tests
+       // 
-------------------------------------------------------------------------------------
+
+       // prepare a source collection.
+       private static final List<Row> testData1 = new ArrayList<>();
+       private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
+               new TypeInformation[]{Types.INT, Types.INT, Types.STRING, 
Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
+               new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", 
"f3c3"});
+
+       static {
+               testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, 
"Welt-1"));
+               testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, 
"Welt-2"));
+               testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, 
"Welt-3"));
+               testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
+               testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, 
"Welt-5"));
+               testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, 
"Welt-6"));
+               testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, 
"Welt-7"));
+               testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+       }
+
+       @Test
+       public void testTableSink() throws Exception {
+               HBaseTableSchema schema = new HBaseTableSchema();
+               schema.addColumn(FAMILY1, F1COL1, Integer.class);
+               schema.addColumn(FAMILY2, F2COL1, String.class);
+               schema.addColumn(FAMILY2, F2COL2, Long.class);
+               schema.setRowKey("rk", Integer.class);
+               schema.addColumn(FAMILY3, F3COL1, Double.class);
+               schema.addColumn(FAMILY3, F3COL2, Boolean.class);
+               schema.addColumn(FAMILY3, F3COL3, String.class);
+
+               Map<String, String> tableProperties = new HashMap<>();
+               tableProperties.put("connector.type", "hbase");
+               tableProperties.put("connector.version", "1.4.3");
+               tableProperties.put("connector.property-version", "1");
+               tableProperties.put("connector.table-name", TEST_TABLE_2);
+               tableProperties.put("connector.zookeeper.quorum", 
getZookeeperQuorum());
+               tableProperties.put("connector.zookeeper.znode.parent", 
"/hbase");
+               DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
+               descriptorProperties.putTableSchema(SCHEMA, 
schema.convertsToTableSchema());
+               descriptorProperties.putProperties(tableProperties);
+               TableSink tableSink = TableFactoryService
+                       .find(HBaseTableFactory.class, 
descriptorProperties.asMap())
+                       .createTableSink(descriptorProperties.asMap());
+
+               StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(execEnv, streamSettings);
+
+               DataStream<Row> ds = 
execEnv.fromCollection(testData1).returns(testTypeInfo1);
+               tEnv.registerDataStream("src", ds);
+               tEnv.registerTableSink("hbase", tableSink);
+
+               String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, 
f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+               tEnv.sqlUpdate(query);
+
+               // wait to finish
+               tEnv.execute("HBase Job");
+
+               // start a batch scan job to verify contents in HBase table
+               // start a batch scan job to verify contents in HBase table
+               TableEnvironment batchTableEnv = createBatchTableEnv();
+
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_2);
+               hbaseTable.setRowKey("rowkey", Integer.class);
+               hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+               hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+               hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+               hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+               hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+               hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+               batchTableEnv.registerTableSource("hTable", hbaseTable);
+
+               Table table = batchTableEnv.sqlQuery(
+                       "SELECT " +
+                               "  h.rowkey, " +
+                               "  h.family1.col1, " +
+                               "  h.family2.col1, " +
+                               "  h.family2.col2, " +
+                               "  h.family3.col1, " +
+                               "  h.family3.col2, " +
+                               "  h.family3.col3 " +
+                               "FROM hTable AS h"
+               );
+
+               List<Row> results = collectBatchResult(table);
+               String expected =
+                       "1,10,Hello-1,100,1.01,false,Welt-1\n" +
+                               "2,20,Hello-2,200,2.02,true,Welt-2\n" +
+                               "3,30,Hello-3,300,3.03,false,Welt-3\n" +
+                               "4,40,,400,4.04,true,Welt-4\n" +
+                               "5,50,Hello-5,500,5.05,false,Welt-5\n" +
+                               "6,60,Hello-6,600,6.06,true,Welt-6\n" +
+                               "7,70,Hello-7,700,7.07,false,Welt-7\n" +
+                               "8,80,,800,8.08,true,Welt-8\n";
+
+               TestBaseUtils.compareResultAsText(results, expected);
+       }
+
+
+       // 
-------------------------------------------------------------------------------------
+       // HBase lookup source tests
+       // 
-------------------------------------------------------------------------------------
+
+       // prepare a source collection.
+       private static final List<Row> testData2 = new ArrayList<>();
+       private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo(
+               new TypeInformation[]{Types.INT, Types.LONG, Types.STRING},
+               new String[]{"a", "b", "c"});
+
+       static {
+               testData2.add(Row.of(1, 1L, "Hi"));
+               testData2.add(Row.of(2, 2L, "Hello"));
+               testData2.add(Row.of(3, 2L, "Hello world"));
+               testData2.add(Row.of(3, 3L, "Hello world!"));
+       }
+
+       @Test
+       public void testHBaseLookupFunction() throws Exception {
+               StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamTableEnvironment streamTableEnv = 
StreamTableEnvironment.create(streamEnv, streamSettings);
+               StreamITCase.clear();
+
+               // prepare a source table
+               DataStream<Row> ds = 
streamEnv.fromCollection(testData2).returns(testTypeInfo2);
+               Table in = streamTableEnv.fromDataStream(ds, "a, b, c");
+               streamTableEnv.registerTable("src", in);
+
+               Map<String, String> tableProperties = hbaseTableProperties();
+               TableSource source = TableFactoryService
+                       .find(HBaseTableFactory.class, tableProperties)
+                       .createTableSource(tableProperties);
+
+               streamTableEnv.registerFunction("hbaseLookup", 
((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
+
+               // perform a temporal table join query
+               String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM 
src, LATERAL TABLE(hbaseLookup(a))";
+               Table result = streamTableEnv.sqlQuery(sqlQuery);
+
+               DataStream<Row> resultSet = 
streamTableEnv.toAppendStream(result, Row.class);
+               resultSet.addSink(new StreamITCase.StringSink<>());
+
+               streamEnv.execute();
+
+               List<String> expected = new ArrayList<>();
+               expected.add("1,10,Welt-1");
+               expected.add("2,20,Welt-2");
+               expected.add("3,30,Welt-3");
+               expected.add("3,30,Welt-3");
+
+               StreamITCase.compareWithList(expected);
+       }
+
+       @Test
+       public void testHBaseLookupTableSource() throws Exception {
+               if (OLD_PLANNER.equals(planner)) {
+                       // lookup table source is only supported in blink 
planner, skip for old planner
+                       return;
+               }
+               StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               StreamTableEnvironment streamTableEnv = 
StreamTableEnvironment.create(streamEnv, streamSettings);
+               StreamITCase.clear();
+
+               // prepare a source table
+               String srcTableName = "src";
+               DataStream<Row> ds = 
streamEnv.fromCollection(testData2).returns(testTypeInfo2);
+               Table in = streamTableEnv.fromDataStream(ds, "a, b, c, 
proc.proctime");
+               streamTableEnv.registerTable(srcTableName, in);
+
+               Map<String, String> tableProperties = hbaseTableProperties();
+               TableSource source = TableFactoryService
+                       .find(HBaseTableFactory.class, tableProperties)
+                       .createTableSource(tableProperties);
+               streamTableEnv.registerTableSource("hbaseLookup", source);
+               // perform a temporal table join query
+               String query = "SELECT a,family1.col1, family3.col3 FROM src " +
+                       "JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h 
ON src.a = h.rk";
+               Table result = streamTableEnv.sqlQuery(query);
+
+               DataStream<Row> resultSet = 
streamTableEnv.toAppendStream(result, Row.class);
+               resultSet.addSink(new StreamITCase.StringSink<>());
+
+               streamEnv.execute();
+
+               List<String> expected = new ArrayList<>();
+               expected.add("1,10,Welt-1");
+               expected.add("2,20,Welt-2");
+               expected.add("3,30,Welt-3");
+               expected.add("3,30,Welt-3");
+
+               StreamITCase.compareWithList(expected);
+       }
+
+       private static Map<String, String> hbaseTableProperties() {
+               Map<String, String> properties = new HashMap<>();
+               properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
+               properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
+               properties.put(CONNECTOR_PROPERTY_VERSION, "1");
+               properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
+               // get zk quorum from "hbase-site.xml" in classpath
+               String hbaseZk = 
HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
+               properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
+               // schema
+               String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
+               TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, 
Types.INT);
+               TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, 
F2COL2}, Types.STRING, Types.LONG);
+               TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, 
F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
+               TypeInformation[] columnTypes = new TypeInformation[]{f1, 
Types.INT, f2, f3};
+
+               DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
+               TableSchema tableSchema = new TableSchema(columnNames, 
columnTypes);
+               descriptorProperties.putTableSchema(SCHEMA, tableSchema);
+               descriptorProperties.putProperties(properties);
+               return descriptorProperties.asMap();
+       }
+
+       // ------------------------------- Utilities 
-------------------------------------------------
+
+       /**
+        * Creates a Batch {@link TableEnvironment} depends on the {@link 
#planner} context.
+        */
+       private TableEnvironment createBatchTableEnv() {
+               if (OLD_PLANNER.equals(planner)) {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       return BatchTableEnvironment.create(env, new 
TableConfig());
+               } else {
+                       return TableEnvironment.create(batchSettings);
+               }
+       }
+
+       /**
+        * Collects batch result depends on the {@link #planner} context.
+        */
+       private List<Row> collectBatchResult(Table table) throws Exception {
+               TableImpl tableImpl = (TableImpl) table;
+               if (OLD_PLANNER.equals(planner)) {
+                       BatchTableEnvironment batchTableEnv = 
(BatchTableEnvironment) tableImpl.getTableEnvironment();
+                       DataSet<Row> resultSet = batchTableEnv.toDataSet(table, 
Row.class);
+                       return resultSet.collect();
+               } else {
+                       return 
JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl));
+               }
+       }
+
        /**
         * A {@link ScalarFunction} that maps byte arrays to UTF-8 strings.
         */
        public static class ToUTF8 extends ScalarFunction {
+               private static final long serialVersionUID = 1L;
 
                public String eval(byte[] bytes) {
                        return Bytes.toString(bytes);
@@ -305,15 +480,18 @@ public class HBaseConnectorITCase extends 
HBaseTestingClusterAutostarter {
         * A {@link ScalarFunction} that maps byte array to longs.
         */
        public static class ToLong extends ScalarFunction {
+               private static final long serialVersionUID = 1L;
 
                public long eval(byte[] bytes) {
                        return Bytes.toLong(bytes);
                }
        }
 
-       // ######## TableInputFormat tests ############
-
-       class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> 
{
+       /**
+        * A {@link TableInputFormat} for testing.
+        */
+       public static class InputFormatForTestTable extends 
TableInputFormat<Tuple1<Integer>> {
+               private static final long serialVersionUID = 1L;
 
                @Override
                protected Scan getScanner() {
@@ -322,7 +500,7 @@ public class HBaseConnectorITCase extends 
HBaseTestingClusterAutostarter {
 
                @Override
                protected String getTableName() {
-                       return TEST_TABLE;
+                       return TEST_TABLE_1;
                }
 
                @Override
@@ -331,51 +509,4 @@ public class HBaseConnectorITCase extends 
HBaseTestingClusterAutostarter {
                }
        }
 
-       @Test
-       public void testTableInputFormat() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-
-               DataSet<Tuple1<Integer>> result = env
-                       .createInput(new InputFormatForTestTable())
-                       .reduce(new ReduceFunction<Tuple1<Integer>>(){
-
-                               @Override
-                               public Tuple1<Integer> reduce(Tuple1<Integer> 
v1, Tuple1<Integer> v2) throws Exception {
-                                       return Tuple1.of(v1.f0 + v2.f0);
-                               }
-                       });
-
-               List<Tuple1<Integer>> resultSet = result.collect();
-
-               assertEquals(1, resultSet.size());
-               assertEquals(360, (int) resultSet.get(0).f0);
-       }
-
-       /**
-        * Allows the tests to use {@link 
ExecutionEnvironment#getExecutionEnvironment()} but with a
-        * configuration that limits the maximum memory used for network 
buffers since the current
-        * defaults are too high for Travis-CI.
-        */
-       private abstract static class LimitNetworkBuffersTestEnvironment 
extends ExecutionEnvironment {
-
-               public static void setAsContext() {
-                       Configuration config = new Configuration();
-                       // the default network buffers size (10% of heap max =~ 
150MB) seems to much for this test case
-                       
config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, 
String.valueOf(80L << 20)); // 80 MB
-                       final LocalEnvironment le = new 
LocalEnvironment(config);
-
-                       initializeContextEnvironment(new 
ExecutionEnvironmentFactory() {
-                               @Override
-                               public ExecutionEnvironment 
createExecutionEnvironment() {
-                                       return le;
-                               }
-                       });
-               }
-
-               public static void unsetAsContext() {
-                       resetContextEnvironment();
-               }
-       }
-
 }
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
deleted file mode 100644
index 731cf31..0000000
--- 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sources.TableSource;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
-import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-/**
- * IT case Test HBaseLookupFunction.
- */
-public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter {
-       private static final String ROWKEY = "rk";
-       private static final String FAMILY1 = "family1";
-       private static final String F1COL1 = "col1";
-
-       private static final String FAMILY2 = "family2";
-       private static final String F2COL1 = "col1";
-       private static final String F2COL2 = "col2";
-
-       private static final String FAMILY3 = "family3";
-       private static final String F3COL1 = "col1";
-       private static final String F3COL2 = "col2";
-       private static final String F3COL3 = "col3";
-
-       private static final String HTABLE_NAME = "testSrcHBaseTable1";
-
-       // prepare a source collection.
-       private static final List<Row> testData1 = new ArrayList<>();
-
-       static {
-               testData1.add(Row.of(1, 1L, "Hi"));
-               testData1.add(Row.of(2, 2L, "Hello"));
-               testData1.add(Row.of(3, 2L, "Hello world"));
-               testData1.add(Row.of(3, 3L, "Hello world!"));
-       }
-
-       private static final TypeInformation<?>[] testTypes1 = 
{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO};
-       private static final String[] testColumns1 = {"a", "b", "c"};
-       private static final RowTypeInfo testTypeInfo1 = new 
RowTypeInfo(testTypes1, testColumns1);
-
-       @BeforeClass
-       public static void activateHBaseCluster() throws IOException {
-               registerHBaseMiniClusterInClasspath();
-               prepareHBaseTableWithData();
-       }
-
-       private static void prepareHBaseTableWithData() throws IOException {
-               // create a table
-               TableName tableName = TableName.valueOf(HTABLE_NAME);
-               // column families
-               byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), 
Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
-               // split keys
-               byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
-               createTable(tableName, families, splitKeys);
-
-               // get the HTable instance
-               HTable table = openTable(tableName);
-               List<Put> puts = new ArrayList<>();
-               // add some data
-               puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-               puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-               puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-               puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
-               puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-               puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-               puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-               puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
-
-               // append rows to table
-               table.put(puts);
-               table.close();
-       }
-
-       private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, 
double f3c1, boolean f3c2, String f3c3) {
-               Put put = new Put(Bytes.toBytes(rowKey));
-               // family 1
-               put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), 
Bytes.toBytes(f1c1));
-               // family 2
-               if (f2c1 != null) {
-                       put.addColumn(Bytes.toBytes(FAMILY2), 
Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
-               }
-               put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), 
Bytes.toBytes(f2c2));
-               // family 3
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), 
Bytes.toBytes(f3c1));
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), 
Bytes.toBytes(f3c2));
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), 
Bytes.toBytes(f3c3));
-
-               return put;
-       }
-
-       private static Map<String, String> hbaseTableProperties() {
-               Map<String, String> properties = new HashMap<>();
-               properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
-               properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
-               properties.put(CONNECTOR_PROPERTY_VERSION, "1");
-               properties.put(CONNECTOR_TABLE_NAME, HTABLE_NAME);
-               // get zk quorum from "hbase-site.xml" in classpath
-               String hbaseZk = 
HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
-               properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
-               // schema
-               String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
-               TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, 
Types.INT);
-               TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, 
F2COL2}, Types.STRING, Types.LONG);
-               TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, 
F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
-               TypeInformation[] columnTypes = new TypeInformation[]{f1, 
Types.INT, f2, f3};
-
-               DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-               TableSchema tableSchema = new TableSchema(columnNames, 
columnTypes);
-               descriptorProperties.putTableSchema(SCHEMA, tableSchema);
-               descriptorProperties.putProperties(properties);
-               return descriptorProperties.asMap();
-       }
-
-       @Test
-       public void testHBaseLookupFunction() throws Exception {
-               StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               streamEnv.setParallelism(4);
-               StreamTableEnvironment streamTableEnv = 
StreamTableEnvironment.create(streamEnv);
-               StreamITCase.clear();
-
-               // prepare a source table
-               String srcTableName = "testStreamSrcTable1";
-               DataStream<Row> ds = 
streamEnv.fromCollection(testData1).returns(testTypeInfo1);
-               Table in = streamTableEnv.fromDataStream(ds, String.join(",", 
testColumns1));
-               streamTableEnv.registerTable(srcTableName, in);
-
-               Map<String, String> tableProperties = hbaseTableProperties();
-               TableSource source = TableFactoryService
-                       .find(HBaseTableFactory.class, tableProperties)
-                       .createTableSource(tableProperties);
-
-               streamTableEnv.registerFunction("hbaseLookup", 
((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));
-
-               // perform a temporal table join query
-               String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM 
testStreamSrcTable1, LATERAL TABLE(hbaseLookup(a))";
-               Table result = streamTableEnv.sqlQuery(sqlQuery);
-
-               DataStream<Row> resultSet = 
streamTableEnv.toAppendStream(result, Row.class);
-               resultSet.addSink(new StreamITCase.StringSink<>());
-
-               streamEnv.execute();
-
-               List<String> expected = new ArrayList<>();
-               expected.add("1,10,Welt-1");
-               expected.add("2,20,Welt-2");
-               expected.add("3,30,Welt-3");
-               expected.add("3,30,Welt-3");
-
-               StreamITCase.compareWithList(expected);
-       }
-}
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
deleted file mode 100644
index ef1445d..0000000
--- 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.runtime.utils.StreamITCase;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-/**
- * IT case Test for {@link HBaseUpsertTableSink}.
- */
-public class HBaseSinkITCase extends HBaseTestingClusterAutostarter {
-       private static final long serialVersionUID = 1L;
-
-       private static final String TEST_TABLE = "testTable";
-
-       private static final String FAMILY1 = "family1";
-       private static final String F1COL1 = "col1";
-
-       private static final String FAMILY2 = "family2";
-       private static final String F2COL1 = "col1";
-       private static final String F2COL2 = "col2";
-
-       private static final String FAMILY3 = "family3";
-       private static final String F3COL1 = "col1";
-       private static final String F3COL2 = "col2";
-       private static final String F3COL3 = "col3";
-
-       // prepare a source collection.
-       private static final List<Row> testData1 = new ArrayList<>();
-       private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
-               new TypeInformation[]{Types.INT, Types.INT, Types.STRING, 
Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
-               new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", 
"f3c3"});
-
-       static {
-               testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, 
"Welt-1"));
-               testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, 
"Welt-2"));
-               testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, 
"Welt-3"));
-               testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
-               testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, 
"Welt-5"));
-               testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, 
"Welt-6"));
-               testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, 
"Welt-7"));
-               testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
-       }
-
-       @BeforeClass
-       public static void activateHBaseCluster() throws IOException {
-               registerHBaseMiniClusterInClasspath();
-               createHBaseTable();
-       }
-
-       private static void createHBaseTable() {
-               // create a table
-               TableName tableName = TableName.valueOf(TEST_TABLE);
-               // column families
-               byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), 
Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
-               // split keys
-               byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
-               createTable(tableName, families, splitKeys);
-       }
-
-       @Test
-       public void testTableSink() throws Exception {
-               HBaseTableSchema schema = new HBaseTableSchema();
-               schema.addColumn(FAMILY1, F1COL1, Integer.class);
-               schema.addColumn(FAMILY2, F2COL1, String.class);
-               schema.addColumn(FAMILY2, F2COL2, Long.class);
-               schema.setRowKey("rk", Integer.class);
-               schema.addColumn(FAMILY3, F3COL1, Double.class);
-               schema.addColumn(FAMILY3, F3COL2, Boolean.class);
-               schema.addColumn(FAMILY3, F3COL3, String.class);
-
-               Map<String, String> tableProperties = new HashMap<>();
-               tableProperties.put("connector.type", "hbase");
-               tableProperties.put("connector.version", "1.4.3");
-               tableProperties.put("connector.property-version", "1");
-               tableProperties.put("connector.table-name", TEST_TABLE);
-               tableProperties.put("connector.zookeeper.quorum", 
getZookeeperQuorum());
-               tableProperties.put("connector.zookeeper.znode.parent", 
"/hbase");
-               DescriptorProperties descriptorProperties = new 
DescriptorProperties(true);
-               descriptorProperties.putTableSchema(SCHEMA, 
schema.convertsToTableSchema());
-               descriptorProperties.putProperties(tableProperties);
-               TableSink tableSink = TableFactoryService
-                       .find(HBaseTableFactory.class, 
descriptorProperties.asMap())
-                       .createTableSink(descriptorProperties.asMap());
-
-               StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               execEnv.setParallelism(4);
-               StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(execEnv);
-               StreamITCase.clear();
-
-               DataStream<Row> ds = 
execEnv.fromCollection(testData1).returns(testTypeInfo1);
-               tEnv.registerDataStream("src", ds);
-               tEnv.registerTableSink("hbase", tableSink);
-
-               String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, 
f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
-               tEnv.sqlUpdate(query);
-
-               // wait to finish
-               tEnv.execute("HBase Job");
-
-               // start a batch scan job to verify contents in HBase table
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
-
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
-               hbaseTable.setRowKey("rowkey", Integer.class);
-               hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
-               hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
-               hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
-               hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
-               hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
-               hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-               tableEnv.registerTableSource("hTable", hbaseTable);
-
-               Table result = tableEnv.sqlQuery(
-                       "SELECT " +
-                               "  h.rowkey, " +
-                               "  h.family1.col1, " +
-                               "  h.family2.col1, " +
-                               "  h.family2.col2, " +
-                               "  h.family3.col1, " +
-                               "  h.family3.col2, " +
-                               "  h.family3.col3 " +
-                               "FROM hTable AS h"
-               );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-
-               String expected =
-                               "1,10,Hello-1,100,1.01,false,Welt-1\n" +
-                               "2,20,Hello-2,200,2.02,true,Welt-2\n" +
-                               "3,30,Hello-3,300,3.03,false,Welt-3\n" +
-                               "4,40,,400,4.04,true,Welt-4\n" +
-                               "5,50,Hello-5,500,5.05,false,Welt-5\n" +
-                               "6,60,Hello-6,600,6.06,true,Welt-6\n" +
-                               "7,70,Hello-7,700,7.07,false,Welt-7\n" +
-                               "8,80,,800,8.08,true,Welt-8\n";
-
-               TestBaseUtils.compareResultAsText(results, expected);
-       }
-}
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
new file mode 100644
index 0000000..32baf0c
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.addons.hbase.util.PlannerType.BLINK_PLANNER;
+import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
+
+/**
+ * Abstract IT case class for HBase.
+ */
+@RunWith(Parameterized.class)
+public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
+
+       protected static final String TEST_TABLE_1 = "testTable1";
+       protected static final String TEST_TABLE_2 = "testTable2";
+
+       protected static final String ROWKEY = "rk";
+       protected static final String FAMILY1 = "family1";
+       protected static final String F1COL1 = "col1";
+
+       protected static final String FAMILY2 = "family2";
+       protected static final String F2COL1 = "col1";
+       protected static final String F2COL2 = "col2";
+
+       protected static final String FAMILY3 = "family3";
+       protected static final String F3COL1 = "col1";
+       protected static final String F3COL2 = "col2";
+       protected static final String F3COL3 = "col3";
+
+       private static final byte[][] FAMILIES = new byte[][]{
+               Bytes.toBytes(FAMILY1),
+               Bytes.toBytes(FAMILY2),
+               Bytes.toBytes(FAMILY3)
+       };
+
+       private static final byte[][] SPLIT_KEYS = new 
byte[][]{Bytes.toBytes(4)};
+
+       @Parameterized.Parameter
+       public PlannerType planner;
+       protected EnvironmentSettings streamSettings;
+       protected EnvironmentSettings batchSettings;
+
+       @Parameterized.Parameters(name = "planner = {0}")
+       public static PlannerType[] getPlanner() {
+               return new PlannerType[]{BLINK_PLANNER, OLD_PLANNER};
+       }
+
+       @BeforeClass
+       public static void activateHBaseCluster() throws IOException {
+               registerHBaseMiniClusterInClasspath();
+               prepareTables();
+       }
+
+       @Before
+       public void before() {
+               EnvironmentSettings.Builder streamBuilder = 
EnvironmentSettings.newInstance().inStreamingMode();
+               EnvironmentSettings.Builder batchBuilder = 
EnvironmentSettings.newInstance().inBatchMode();
+               if (BLINK_PLANNER.equals(planner)) {
+                       this.streamSettings = 
streamBuilder.useBlinkPlanner().build();
+                       this.batchSettings = 
batchBuilder.useBlinkPlanner().build();
+               } else if (OLD_PLANNER.equals(planner)) {
+                       this.streamSettings = 
streamBuilder.useOldPlanner().build();
+                       this.batchSettings = 
batchBuilder.useOldPlanner().build();
+               } else {
+                       throw new IllegalArgumentException("Unsupported planner 
name " + planner);
+               }
+       }
+
+       private static void prepareTables() throws IOException {
+               createHBaseTable1();
+               createHBaseTable2();
+       }
+
+       private static void createHBaseTable1() throws IOException {
+               // create a table
+               TableName tableName = TableName.valueOf(TEST_TABLE_1);
+               createTable(tableName, FAMILIES, SPLIT_KEYS);
+
+               // get the HTable instance
+               HTable table = openTable(tableName);
+               List<Put> puts = new ArrayList<>();
+               // add some data
+               puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+               puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+               puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+               puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
+               puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+               puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+               puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+               puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
+
+               // append rows to table
+               table.put(puts);
+               table.close();
+       }
+
+       private static void createHBaseTable2() {
+               // create a table
+               TableName tableName = TableName.valueOf(TEST_TABLE_2);
+               createTable(tableName, FAMILIES, SPLIT_KEYS);
+       }
+
+       private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, 
double f3c1, boolean f3c2, String f3c3) {
+               Put put = new Put(Bytes.toBytes(rowKey));
+               // family 1
+               put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), 
Bytes.toBytes(f1c1));
+               // family 2
+               if (f2c1 != null) {
+                       put.addColumn(Bytes.toBytes(FAMILY2), 
Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
+               }
+               put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), 
Bytes.toBytes(f2c2));
+               // family 3
+               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), 
Bytes.toBytes(f3c1));
+               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), 
Bytes.toBytes(f3c2));
+               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), 
Bytes.toBytes(f3c3));
+
+               return put;
+       }
+}
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
similarity index 97%
rename from 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
rename to 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
index 905f80d..b515c2b 100644
--- 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java
@@ -18,9 +18,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.addons.hbase;
+package org.apache.flink.addons.hbase.util;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,7 +42,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
@@ -74,9 +73,9 @@ import static org.junit.Assert.fail;
 //
 // 
https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
 //
-public class HBaseTestingClusterAutostarter extends TestLogger implements 
Serializable {
+public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
 
-       private static final Log LOG = 
LogFactory.getLog(HBaseTestingClusterAutostarter.class);
+       private static final Log LOG = 
LogFactory.getLog(HBaseTestingClusterAutoStarter.class);
 
        private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
        private static HBaseAdmin admin = null;
@@ -160,7 +159,7 @@ public class HBaseTestingClusterAutostarter extends 
TestLogger implements Serial
        /**
         * Returns zookeeper quorum value contains the right port number 
(varies per run).
         */
-       static String getZookeeperQuorum() {
+       protected static String getZookeeperQuorum() {
                return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
        }
 
diff --git 
a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java
 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java
new file mode 100644
index 0000000..cd1bf39
--- /dev/null
+++ 
b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+/**
+ * Planner type to use.
+ */
+public enum PlannerType {
+       BLINK_PLANNER,
+       OLD_PLANNER
+}

Reply via email to