wuchong commented on a change in pull request #9275: 
[FLINK-13290][table-planner-blink][hbase] SinkCodeGenerator should not compare 
row type field names and enable blink planner for hbase IT case
URL: https://github.com/apache/flink/pull/9275#discussion_r309632962
 
 

 ##########
 File path: 
flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
 ##########
 @@ -20,281 +18,464 @@
 
 package org.apache.flink.addons.hbase;
 
+import org.apache.flink.addons.hbase.util.HBaseTestBase;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-
+import java.util.Map;
+
+import static org.apache.flink.addons.hbase.util.PlannerType.BLINK_PLANNER;
+import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static 
org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.junit.Assert.assertEquals;
 
 /**
- * This class contains integrations tests for multiple HBase connectors:
- * - TableInputFormat
- * - HBaseTableSource
- *
- * <p>These tests are located in a single test file to avoided unnecessary 
initializations of the
- * HBaseTestingCluster which takes about half a minute.
- *
+ * IT cases for HBase connector (including HBaseTableSource and 
HBaseTableSink).
  */
-public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
-
-       private static final String TEST_TABLE = "testTable";
-
-       private static final String FAMILY1 = "family1";
-       private static final String F1COL1 = "col1";
-
-       private static final String FAMILY2 = "family2";
-       private static final String F2COL1 = "col1";
-       private static final String F2COL2 = "col2";
-
-       private static final String FAMILY3 = "family3";
-       private static final String F3COL1 = "col1";
-       private static final String F3COL2 = "col2";
-       private static final String F3COL3 = "col3";
-
-       @BeforeClass
-       public static void activateHBaseCluster() throws IOException {
-               registerHBaseMiniClusterInClasspath();
-               prepareTable();
-               LimitNetworkBuffersTestEnvironment.setAsContext();
-       }
-
-       @AfterClass
-       public static void resetExecutionEnvironmentFactory() {
-               LimitNetworkBuffersTestEnvironment.unsetAsContext();
-       }
-
-       private static void prepareTable() throws IOException {
-
-               // create a table
-               TableName tableName = TableName.valueOf(TEST_TABLE);
-               // column families
-               byte[][] families = new byte[][]{
-                       Bytes.toBytes(FAMILY1),
-                       Bytes.toBytes(FAMILY2),
-                       Bytes.toBytes(FAMILY3)
-               };
-               // split keys
-               byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
-               createTable(tableName, families, splitKeys);
-
-               // get the HTable instance
-               HTable table = openTable(tableName);
-               List<Put> puts = new ArrayList<>();
-               // add some data
-               puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
-               puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
-               puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
-               puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
-               puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
-               puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
-               puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
-               puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
-
-               // append rows to table
-               table.put(puts);
-               table.close();
-       }
-
-       private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, 
double f3c1, boolean f3c2, String f3c3) {
-               Put put = new Put(Bytes.toBytes(rowKey));
-               // family 1
-               put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), 
Bytes.toBytes(f1c1));
-               // family 2
-               if (f2c1 != null) {
-                       put.addColumn(Bytes.toBytes(FAMILY2), 
Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1));
-               }
-               put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), 
Bytes.toBytes(f2c2));
-               // family 3
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), 
Bytes.toBytes(f3c1));
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), 
Bytes.toBytes(f3c2));
-               put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), 
Bytes.toBytes(f3c3));
-
-               return put;
-       }
+public class HBaseConnectorITCase extends HBaseTestBase {
 
-       // ######## HBaseTableSource tests ############
+       // 
-------------------------------------------------------------------------------------
+       // HBaseTableSource tests
+       // 
-------------------------------------------------------------------------------------
 
        @Test
        public void testTableSourceFullScan() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               TableEnvironment tEnv = createBatchTableEnv();
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
                hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
                hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
                hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-               tableEnv.registerTableSource("hTable", hbaseTable);
-
-               Table result = tableEnv.sqlQuery(
-                       "SELECT " +
-                               "  h.family1.col1, " +
-                               "  h.family2.col1, " +
-                               "  h.family2.col2, " +
-                               "  h.family3.col1, " +
-                               "  h.family3.col2, " +
-                               "  h.family3.col3 " +
-                               "FROM hTable AS h"
-               );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
-
+               tEnv.registerTableSource("hTable", hbaseTable);
+
+               Table table = tEnv.sqlQuery("SELECT " +
+                       "  h.family1.col1, " +
+                       "  h.family2.col1, " +
+                       "  h.family2.col2, " +
+                       "  h.family3.col1, " +
+                       "  h.family3.col2, " +
+                       "  h.family3.col3 " +
+                       "FROM hTable AS h");
+
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "10,Hello-1,100,1.01,false,Welt-1\n" +
-                       "20,Hello-2,200,2.02,true,Welt-2\n" +
-                       "30,Hello-3,300,3.03,false,Welt-3\n" +
-                       "40,null,400,4.04,true,Welt-4\n" +
-                       "50,Hello-5,500,5.05,false,Welt-5\n" +
-                       "60,Hello-6,600,6.06,true,Welt-6\n" +
-                       "70,Hello-7,700,7.07,false,Welt-7\n" +
-                       "80,null,800,8.08,true,Welt-8\n";
+                               "20,Hello-2,200,2.02,true,Welt-2\n" +
+                               "30,Hello-3,300,3.03,false,Welt-3\n" +
+                               "40,null,400,4.04,true,Welt-4\n" +
+                               "50,Hello-5,500,5.05,false,Welt-5\n" +
+                               "60,Hello-6,600,6.06,true,Welt-6\n" +
+                               "70,Hello-7,700,7.07,false,Welt-7\n" +
+                               "80,null,800,8.08,true,Welt-8\n";
 
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
        @Test
        public void testTableSourceProjection() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               TableEnvironment tEnv = createBatchTableEnv();
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
                hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
                hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
                hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-               tableEnv.registerTableSource("hTable", hbaseTable);
+               tEnv.registerTableSource("hTable", hbaseTable);
 
-               Table result = tableEnv.sqlQuery(
-                       "SELECT " +
-                               "  h.family1.col1, " +
-                               "  h.family3.col1, " +
-                               "  h.family3.col2, " +
-                               "  h.family3.col3 " +
-                               "FROM hTable AS h"
-               );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
+               Table table = tEnv.sqlQuery("SELECT " +
+                       "  h.family1.col1, " +
+                       "  h.family3.col1, " +
+                       "  h.family3.col2, " +
+                       "  h.family3.col3 " +
+                       "FROM hTable AS h");
 
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "10,1.01,false,Welt-1\n" +
-                       "20,2.02,true,Welt-2\n" +
-                       "30,3.03,false,Welt-3\n" +
-                       "40,4.04,true,Welt-4\n" +
-                       "50,5.05,false,Welt-5\n" +
-                       "60,6.06,true,Welt-6\n" +
-                       "70,7.07,false,Welt-7\n" +
-                       "80,8.08,true,Welt-8\n";
+                               "20,2.02,true,Welt-2\n" +
+                               "30,3.03,false,Welt-3\n" +
+                               "40,4.04,true,Welt-4\n" +
+                               "50,5.05,false,Welt-5\n" +
+                               "60,6.06,true,Welt-6\n" +
+                               "70,7.07,false,Welt-7\n" +
+                               "80,8.08,true,Welt-8\n";
 
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
        @Test
        public void testTableSourceFieldOrder() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               TableEnvironment tEnv = createBatchTableEnv();
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                // shuffle order of column registration
                hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
                hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
                hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
                hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
                hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
                hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
-               tableEnv.registerTableSource("hTable", hbaseTable);
+               tEnv.registerTableSource("hTable", hbaseTable);
 
-               Table result = tableEnv.sqlQuery(
-                       "SELECT * FROM hTable AS h"
-               );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
+               Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h");
 
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "Hello-1,100,1.01,false,Welt-1,10\n" +
-                       "Hello-2,200,2.02,true,Welt-2,20\n" +
-                       "Hello-3,300,3.03,false,Welt-3,30\n" +
-                       "null,400,4.04,true,Welt-4,40\n" +
-                       "Hello-5,500,5.05,false,Welt-5,50\n" +
-                       "Hello-6,600,6.06,true,Welt-6,60\n" +
-                       "Hello-7,700,7.07,false,Welt-7,70\n" +
-                       "null,800,8.08,true,Welt-8,80\n";
+                               "Hello-2,200,2.02,true,Welt-2,20\n" +
+                               "Hello-3,300,3.03,false,Welt-3,30\n" +
+                               "null,400,4.04,true,Welt-4,40\n" +
+                               "Hello-5,500,5.05,false,Welt-5,50\n" +
+                               "Hello-6,600,6.06,true,Welt-6,60\n" +
+                               "Hello-7,700,7.07,false,Welt-7,70\n" +
+                               "null,800,8.08,true,Welt-8,80\n";
 
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
        @Test
        public void testTableSourceReadAsByteArray() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(4);
-               BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(env, new TableConfig());
+               TableEnvironment tEnv = createBatchTableEnv();
                // fetch row2 from the table till the end
-               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE);
+               HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), 
TEST_TABLE_1);
                hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class);
                hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class);
+               tEnv.registerTableSource("hTable", hbaseTable);
+               tEnv.registerFunction("toUTF8", new ToUTF8());
+               tEnv.registerFunction("toLong", new ToLong());
 
-               tableEnv.registerTableSource("hTable", hbaseTable);
-               tableEnv.registerFunction("toUTF8", new ToUTF8());
-               tableEnv.registerFunction("toLong", new ToLong());
-
-               Table result = tableEnv.sqlQuery(
+               Table table = tEnv.sqlQuery(
                        "SELECT " +
                                "  toUTF8(h.family2.col1), " +
                                "  toLong(h.family2.col2) " +
                                "FROM hTable AS h"
                );
-               DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-               List<Row> results = resultSet.collect();
 
+               List<Row> results = collectBatchResult(table);
                String expected =
                        "Hello-1,100\n" +
-                       "Hello-2,200\n" +
-                       "Hello-3,300\n" +
-                       "null,400\n" +
-                       "Hello-5,500\n" +
-                       "Hello-6,600\n" +
-                       "Hello-7,700\n" +
-                       "null,800\n";
+                               "Hello-2,200\n" +
+                               "Hello-3,300\n" +
+                               "null,400\n" +
+                               "Hello-5,500\n" +
+                               "Hello-6,600\n" +
+                               "Hello-7,700\n" +
+                               "null,800\n";
+
+               TestBaseUtils.compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testTableInputFormat() throws Exception {
+               if (BLINK_PLANNER.equals(planner)) {
+                       // this case is for testing TableInputFormat which is 
not works for flink-table
 
 Review comment:
   Not exactly. I mean `TableInputFormat` is a runtime implementation, not for 
flink table, not relative to what the planner is. You can see the test is only 
verified on `DataSet`, not on `TableEnvironment`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to