This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 61adb66107c462a60be021e21a3cb6251452fa45 Author: Jark Wu <[email protected]> AuthorDate: Tue Jul 30 18:17:39 2019 +0800 [FLINK-13290][hbase] Enable blink planner for integration tests of HBase This commit combines HBaseTableSourceITCase and HBaseLookupFunctionITCase and HBaseConnectorITCase into one class. This can save much cluster initialization time for us. This closes #9275 --- flink-connectors/flink-hbase/pom.xml | 27 +- .../flink/addons/hbase/HBaseTableSource.java | 5 +- .../flink/addons/hbase/HBaseConnectorITCase.java | 601 +++++++++++++-------- .../addons/hbase/HBaseLookupFunctionITCase.java | 203 ------- .../apache/flink/addons/hbase/HBaseSinkITCase.java | 186 ------- .../flink/addons/hbase/util/HBaseTestBase.java | 150 +++++ .../HBaseTestingClusterAutoStarter.java} | 11 +- .../flink/addons/hbase/util/PlannerType.java | 27 + 8 files changed, 571 insertions(+), 639 deletions(-) diff --git a/flink-connectors/flink-hbase/pom.xml b/flink-connectors/flink-hbase/pom.xml index 685b4e0..9d9f579 100644 --- a/flink-connectors/flink-hbase/pom.xml +++ b/flink-connectors/flink-hbase/pom.xml @@ -99,14 +99,6 @@ under the License. <scope>provided</scope> <optional>true</optional> </dependency> - <!-- A planner dependency won't be necessary once FLIP-32 has been completed. --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - <optional>true</optional> - </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -292,6 +284,25 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index 3fff5fa..b1e7161 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -180,7 +180,10 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { - throw new UnsupportedOperationException("HBase table can not convert to DataStream currently."); + HBaseTableSchema projectedSchema = hbaseSchema.getProjectedHBaseTableSchema(projectFields); + return execEnv + .createInput(new HBaseRowInputFormat(conf, tableName, projectedSchema), getReturnType()) + .name(explainSource()); } @VisibleForTesting diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java index bd992c5..6a0909e 100644 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java @@ -1,6 +1,4 @@ /* - * Copyright The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,212 +18,138 @@ package org.apache.flink.addons.hbase; +import org.apache.flink.addons.hbase.util.HBaseTestBase; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; -import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.internal.TableImpl; import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.planner.runtime.utils.TableUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.runtime.utils.StreamITCase; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sources.TableSource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; - +import java.util.Map; + +import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143; +import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.junit.Assert.assertEquals; /** - * This class contains integrations tests for multiple HBase connectors: - * - TableInputFormat - * - HBaseTableSource - * - * <p>These tests are located in a single test file to avoided unnecessary initializations of the - * HBaseTestingCluster which takes about half a minute. - * + * IT cases for HBase connector (including HBaseTableSource and HBaseTableSink). */ -public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { - - private static final String TEST_TABLE = "testTable"; - - private static final String FAMILY1 = "family1"; - private static final String F1COL1 = "col1"; - - private static final String FAMILY2 = "family2"; - private static final String F2COL1 = "col1"; - private static final String F2COL2 = "col2"; - - private static final String FAMILY3 = "family3"; - private static final String F3COL1 = "col1"; - private static final String F3COL2 = "col2"; - private static final String F3COL3 = "col3"; - - @BeforeClass - public static void activateHBaseCluster() throws IOException { - registerHBaseMiniClusterInClasspath(); - prepareTable(); - LimitNetworkBuffersTestEnvironment.setAsContext(); - } - - @AfterClass - public static void resetExecutionEnvironmentFactory() { - LimitNetworkBuffersTestEnvironment.unsetAsContext(); - } - - private static void prepareTable() throws IOException { - - // create a table - TableName tableName = TableName.valueOf(TEST_TABLE); - // column families - byte[][] families = new byte[][]{ - Bytes.toBytes(FAMILY1), - Bytes.toBytes(FAMILY2), - Bytes.toBytes(FAMILY3) - }; - // split keys - byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) }; - createTable(tableName, families, splitKeys); - - // get the HTable instance - HTable table = openTable(tableName); - List<Put> puts = new ArrayList<>(); - // add some data - puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1")); - puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2")); - puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3")); - puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4")); - puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5")); - puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6")); - puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7")); - puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8")); - - // append rows to table - table.put(puts); - table.close(); - } - - private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) { - Put put = new Put(Bytes.toBytes(rowKey)); - // family 1 - put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1)); - // family 2 - if (f2c1 != null) { - put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1)); - } - put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2)); - // family 3 - put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1)); - put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2)); - put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3)); - - return put; - } +public class HBaseConnectorITCase extends HBaseTestBase { - // ######## HBaseTableSource tests ############ + // ------------------------------------------------------------------------------------- + // HBaseTableSource tests + // ------------------------------------------------------------------------------------- @Test public void testTableSourceFullScan() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig()); - HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE); + TableEnvironment tEnv = createBatchTableEnv(); + HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1); hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class); hbaseTable.addColumn(FAMILY2, F2COL1, String.class); hbaseTable.addColumn(FAMILY2, F2COL2, Long.class); hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); hbaseTable.addColumn(FAMILY3, F3COL3, String.class); - tableEnv.registerTableSource("hTable", hbaseTable); - - Table result = tableEnv.sqlQuery( - "SELECT " + - " h.family1.col1, " + - " h.family2.col1, " + - " h.family2.col2, " + - " h.family3.col1, " + - " h.family3.col2, " + - " h.family3.col3 " + - "FROM hTable AS h" - ); - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - + tEnv.registerTableSource("hTable", hbaseTable); + + Table table = tEnv.sqlQuery("SELECT " + + " h.family1.col1, " + + " h.family2.col1, " + + " h.family2.col2, " + + " h.family3.col1, " + + " h.family3.col2, " + + " h.family3.col3 " + + "FROM hTable AS h"); + + List<Row> results = collectBatchResult(table); String expected = "10,Hello-1,100,1.01,false,Welt-1\n" + - "20,Hello-2,200,2.02,true,Welt-2\n" + - "30,Hello-3,300,3.03,false,Welt-3\n" + - "40,null,400,4.04,true,Welt-4\n" + - "50,Hello-5,500,5.05,false,Welt-5\n" + - "60,Hello-6,600,6.06,true,Welt-6\n" + - "70,Hello-7,700,7.07,false,Welt-7\n" + - "80,null,800,8.08,true,Welt-8\n"; + "20,Hello-2,200,2.02,true,Welt-2\n" + + "30,Hello-3,300,3.03,false,Welt-3\n" + + "40,null,400,4.04,true,Welt-4\n" + + "50,Hello-5,500,5.05,false,Welt-5\n" + + "60,Hello-6,600,6.06,true,Welt-6\n" + + "70,Hello-7,700,7.07,false,Welt-7\n" + + "80,null,800,8.08,true,Welt-8\n"; TestBaseUtils.compareResultAsText(results, expected); } @Test public void testTableSourceProjection() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig()); - HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE); + TableEnvironment tEnv = createBatchTableEnv(); + HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1); hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class); hbaseTable.addColumn(FAMILY2, F2COL1, String.class); hbaseTable.addColumn(FAMILY2, F2COL2, Long.class); hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); hbaseTable.addColumn(FAMILY3, F3COL3, String.class); - tableEnv.registerTableSource("hTable", hbaseTable); + tEnv.registerTableSource("hTable", hbaseTable); - Table result = tableEnv.sqlQuery( - "SELECT " + - " h.family1.col1, " + - " h.family3.col1, " + - " h.family3.col2, " + - " h.family3.col3 " + - "FROM hTable AS h" - ); - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); + Table table = tEnv.sqlQuery("SELECT " + + " h.family1.col1, " + + " h.family3.col1, " + + " h.family3.col2, " + + " h.family3.col3 " + + "FROM hTable AS h"); + List<Row> results = collectBatchResult(table); String expected = "10,1.01,false,Welt-1\n" + - "20,2.02,true,Welt-2\n" + - "30,3.03,false,Welt-3\n" + - "40,4.04,true,Welt-4\n" + - "50,5.05,false,Welt-5\n" + - "60,6.06,true,Welt-6\n" + - "70,7.07,false,Welt-7\n" + - "80,8.08,true,Welt-8\n"; + "20,2.02,true,Welt-2\n" + + "30,3.03,false,Welt-3\n" + + "40,4.04,true,Welt-4\n" + + "50,5.05,false,Welt-5\n" + + "60,6.06,true,Welt-6\n" + + "70,7.07,false,Welt-7\n" + + "80,8.08,true,Welt-8\n"; TestBaseUtils.compareResultAsText(results, expected); } @Test public void testTableSourceFieldOrder() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig()); - HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE); + TableEnvironment tEnv = createBatchTableEnv(); + HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1); // shuffle order of column registration hbaseTable.addColumn(FAMILY2, F2COL1, String.class); hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); @@ -233,68 +157,319 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { hbaseTable.addColumn(FAMILY2, F2COL2, Long.class); hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); hbaseTable.addColumn(FAMILY3, F3COL3, String.class); - tableEnv.registerTableSource("hTable", hbaseTable); + tEnv.registerTableSource("hTable", hbaseTable); - Table result = tableEnv.sqlQuery( - "SELECT * FROM hTable AS h" - ); - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); + Table table = tEnv.sqlQuery("SELECT * FROM hTable AS h"); + List<Row> results = collectBatchResult(table); String expected = "Hello-1,100,1.01,false,Welt-1,10\n" + - "Hello-2,200,2.02,true,Welt-2,20\n" + - "Hello-3,300,3.03,false,Welt-3,30\n" + - "null,400,4.04,true,Welt-4,40\n" + - "Hello-5,500,5.05,false,Welt-5,50\n" + - "Hello-6,600,6.06,true,Welt-6,60\n" + - "Hello-7,700,7.07,false,Welt-7,70\n" + - "null,800,8.08,true,Welt-8,80\n"; + "Hello-2,200,2.02,true,Welt-2,20\n" + + "Hello-3,300,3.03,false,Welt-3,30\n" + + "null,400,4.04,true,Welt-4,40\n" + + "Hello-5,500,5.05,false,Welt-5,50\n" + + "Hello-6,600,6.06,true,Welt-6,60\n" + + "Hello-7,700,7.07,false,Welt-7,70\n" + + "null,800,8.08,true,Welt-8,80\n"; TestBaseUtils.compareResultAsText(results, expected); } @Test public void testTableSourceReadAsByteArray() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig()); + TableEnvironment tEnv = createBatchTableEnv(); // fetch row2 from the table till the end - HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE); + HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_1); hbaseTable.addColumn(FAMILY2, F2COL1, byte[].class); hbaseTable.addColumn(FAMILY2, F2COL2, byte[].class); + tEnv.registerTableSource("hTable", hbaseTable); + tEnv.registerFunction("toUTF8", new ToUTF8()); + tEnv.registerFunction("toLong", new ToLong()); - tableEnv.registerTableSource("hTable", hbaseTable); - tableEnv.registerFunction("toUTF8", new ToUTF8()); - tableEnv.registerFunction("toLong", new ToLong()); - - Table result = tableEnv.sqlQuery( + Table table = tEnv.sqlQuery( "SELECT " + " toUTF8(h.family2.col1), " + " toLong(h.family2.col2) " + "FROM hTable AS h" ); - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); + List<Row> results = collectBatchResult(table); String expected = "Hello-1,100\n" + - "Hello-2,200\n" + - "Hello-3,300\n" + - "null,400\n" + - "Hello-5,500\n" + - "Hello-6,600\n" + - "Hello-7,700\n" + - "null,800\n"; + "Hello-2,200\n" + + "Hello-3,300\n" + + "null,400\n" + + "Hello-5,500\n" + + "Hello-6,600\n" + + "Hello-7,700\n" + + "null,800\n"; TestBaseUtils.compareResultAsText(results, expected); } + @Test + public void testTableInputFormat() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple1<Integer>> result = env + .createInput(new InputFormatForTestTable()) + .reduce((ReduceFunction<Tuple1<Integer>>) (v1, v2) -> Tuple1.of(v1.f0 + v2.f0)); + + List<Tuple1<Integer>> resultSet = result.collect(); + + assertEquals(1, resultSet.size()); + assertEquals(360, (int) resultSet.get(0).f0); + } + + // ------------------------------------------------------------------------------------- + // HBaseTableSink tests + // ------------------------------------------------------------------------------------- + + // prepare a source collection. + private static final List<Row> testData1 = new ArrayList<>(); + private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo( + new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING}, + new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"}); + + static { + testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1")); + testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2")); + testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3")); + testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4")); + testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5")); + testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6")); + testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7")); + testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8")); + } + + @Test + public void testTableSink() throws Exception { + HBaseTableSchema schema = new HBaseTableSchema(); + schema.addColumn(FAMILY1, F1COL1, Integer.class); + schema.addColumn(FAMILY2, F2COL1, String.class); + schema.addColumn(FAMILY2, F2COL2, Long.class); + schema.setRowKey("rk", Integer.class); + schema.addColumn(FAMILY3, F3COL1, Double.class); + schema.addColumn(FAMILY3, F3COL2, Boolean.class); + schema.addColumn(FAMILY3, F3COL3, String.class); + + Map<String, String> tableProperties = new HashMap<>(); + tableProperties.put("connector.type", "hbase"); + tableProperties.put("connector.version", "1.4.3"); + tableProperties.put("connector.property-version", "1"); + tableProperties.put("connector.table-name", TEST_TABLE_2); + tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum()); + tableProperties.put("connector.zookeeper.znode.parent", "/hbase"); + DescriptorProperties descriptorProperties = new DescriptorProperties(true); + descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema()); + descriptorProperties.putProperties(tableProperties); + TableSink tableSink = TableFactoryService + .find(HBaseTableFactory.class, descriptorProperties.asMap()) + .createTableSink(descriptorProperties.asMap()); + + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, streamSettings); + + DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1); + tEnv.registerDataStream("src", ds); + tEnv.registerTableSink("hbase", tableSink); + + String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src"; + tEnv.sqlUpdate(query); + + // wait to finish + tEnv.execute("HBase Job"); + + // start a batch scan job to verify contents in HBase table + // start a batch scan job to verify contents in HBase table + TableEnvironment batchTableEnv = createBatchTableEnv(); + + HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE_2); + hbaseTable.setRowKey("rowkey", Integer.class); + hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class); + hbaseTable.addColumn(FAMILY2, F2COL1, String.class); + hbaseTable.addColumn(FAMILY2, F2COL2, Long.class); + hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); + hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); + hbaseTable.addColumn(FAMILY3, F3COL3, String.class); + batchTableEnv.registerTableSource("hTable", hbaseTable); + + Table table = batchTableEnv.sqlQuery( + "SELECT " + + " h.rowkey, " + + " h.family1.col1, " + + " h.family2.col1, " + + " h.family2.col2, " + + " h.family3.col1, " + + " h.family3.col2, " + + " h.family3.col3 " + + "FROM hTable AS h" + ); + + List<Row> results = collectBatchResult(table); + String expected = + "1,10,Hello-1,100,1.01,false,Welt-1\n" + + "2,20,Hello-2,200,2.02,true,Welt-2\n" + + "3,30,Hello-3,300,3.03,false,Welt-3\n" + + "4,40,,400,4.04,true,Welt-4\n" + + "5,50,Hello-5,500,5.05,false,Welt-5\n" + + "6,60,Hello-6,600,6.06,true,Welt-6\n" + + "7,70,Hello-7,700,7.07,false,Welt-7\n" + + "8,80,,800,8.08,true,Welt-8\n"; + + TestBaseUtils.compareResultAsText(results, expected); + } + + + // ------------------------------------------------------------------------------------- + // HBase lookup source tests + // ------------------------------------------------------------------------------------- + + // prepare a source collection. + private static final List<Row> testData2 = new ArrayList<>(); + private static final RowTypeInfo testTypeInfo2 = new RowTypeInfo( + new TypeInformation[]{Types.INT, Types.LONG, Types.STRING}, + new String[]{"a", "b", "c"}); + + static { + testData2.add(Row.of(1, 1L, "Hi")); + testData2.add(Row.of(2, 2L, "Hello")); + testData2.add(Row.of(3, 2L, "Hello world")); + testData2.add(Row.of(3, 3L, "Hello world!")); + } + + @Test + public void testHBaseLookupFunction() throws Exception { + StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings); + StreamITCase.clear(); + + // prepare a source table + DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2); + Table in = streamTableEnv.fromDataStream(ds, "a, b, c"); + streamTableEnv.registerTable("src", in); + + Map<String, String> tableProperties = hbaseTableProperties(); + TableSource source = TableFactoryService + .find(HBaseTableFactory.class, tableProperties) + .createTableSource(tableProperties); + + streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY})); + + // perform a temporal table join query + String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM src, LATERAL TABLE(hbaseLookup(a))"; + Table result = streamTableEnv.sqlQuery(sqlQuery); + + DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink<>()); + + streamEnv.execute(); + + List<String> expected = new ArrayList<>(); + expected.add("1,10,Welt-1"); + expected.add("2,20,Welt-2"); + expected.add("3,30,Welt-3"); + expected.add("3,30,Welt-3"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testHBaseLookupTableSource() throws Exception { + if (OLD_PLANNER.equals(planner)) { + // lookup table source is only supported in blink planner, skip for old planner + return; + } + StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings); + StreamITCase.clear(); + + // prepare a source table + String srcTableName = "src"; + DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2); + Table in = streamTableEnv.fromDataStream(ds, "a, b, c, proc.proctime"); + streamTableEnv.registerTable(srcTableName, in); + + Map<String, String> tableProperties = hbaseTableProperties(); + TableSource source = TableFactoryService + .find(HBaseTableFactory.class, tableProperties) + .createTableSource(tableProperties); + streamTableEnv.registerTableSource("hbaseLookup", source); + // perform a temporal table join query + String query = "SELECT a,family1.col1, family3.col3 FROM src " + + "JOIN hbaseLookup FOR SYSTEM_TIME AS OF src.proc as h ON src.a = h.rk"; + Table result = streamTableEnv.sqlQuery(query); + + DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink<>()); + + streamEnv.execute(); + + List<String> expected = new ArrayList<>(); + expected.add("1,10,Welt-1"); + expected.add("2,20,Welt-2"); + expected.add("3,30,Welt-3"); + expected.add("3,30,Welt-3"); + + StreamITCase.compareWithList(expected); + } + + private static Map<String, String> hbaseTableProperties() { + Map<String, String> properties = new HashMap<>(); + properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE); + properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143); + properties.put(CONNECTOR_PROPERTY_VERSION, "1"); + properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1); + // get zk quorum from "hbase-site.xml" in classpath + String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM); + properties.put(CONNECTOR_ZK_QUORUM, hbaseZk); + // schema + String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3}; + TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT); + TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG); + TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING); + TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3}; + + DescriptorProperties descriptorProperties = new DescriptorProperties(true); + TableSchema tableSchema = new TableSchema(columnNames, columnTypes); + descriptorProperties.putTableSchema(SCHEMA, tableSchema); + descriptorProperties.putProperties(properties); + return descriptorProperties.asMap(); + } + + // ------------------------------- Utilities ------------------------------------------------- + + /** + * Creates a Batch {@link TableEnvironment} depends on the {@link #planner} context. + */ + private TableEnvironment createBatchTableEnv() { + if (OLD_PLANNER.equals(planner)) { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + return BatchTableEnvironment.create(env, new TableConfig()); + } else { + return TableEnvironment.create(batchSettings); + } + } + + /** + * Collects batch result depends on the {@link #planner} context. + */ + private List<Row> collectBatchResult(Table table) throws Exception { + TableImpl tableImpl = (TableImpl) table; + if (OLD_PLANNER.equals(planner)) { + BatchTableEnvironment batchTableEnv = (BatchTableEnvironment) tableImpl.getTableEnvironment(); + DataSet<Row> resultSet = batchTableEnv.toDataSet(table, Row.class); + return resultSet.collect(); + } else { + return JavaScalaConversionUtil.toJava(TableUtil.collect(tableImpl)); + } + } + /** * A {@link ScalarFunction} that maps byte arrays to UTF-8 strings. */ public static class ToUTF8 extends ScalarFunction { + private static final long serialVersionUID = 1L; public String eval(byte[] bytes) { return Bytes.toString(bytes); @@ -305,15 +480,18 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { * A {@link ScalarFunction} that maps byte array to longs. */ public static class ToLong extends ScalarFunction { + private static final long serialVersionUID = 1L; public long eval(byte[] bytes) { return Bytes.toLong(bytes); } } - // ######## TableInputFormat tests ############ - - class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> { + /** + * A {@link TableInputFormat} for testing. + */ + public static class InputFormatForTestTable extends TableInputFormat<Tuple1<Integer>> { + private static final long serialVersionUID = 1L; @Override protected Scan getScanner() { @@ -322,7 +500,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { @Override protected String getTableName() { - return TEST_TABLE; + return TEST_TABLE_1; } @Override @@ -331,51 +509,4 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter { } } - @Test - public void testTableInputFormat() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - - DataSet<Tuple1<Integer>> result = env - .createInput(new InputFormatForTestTable()) - .reduce(new ReduceFunction<Tuple1<Integer>>(){ - - @Override - public Tuple1<Integer> reduce(Tuple1<Integer> v1, Tuple1<Integer> v2) throws Exception { - return Tuple1.of(v1.f0 + v2.f0); - } - }); - - List<Tuple1<Integer>> resultSet = result.collect(); - - assertEquals(1, resultSet.size()); - assertEquals(360, (int) resultSet.get(0).f0); - } - - /** - * Allows the tests to use {@link ExecutionEnvironment#getExecutionEnvironment()} but with a - * configuration that limits the maximum memory used for network buffers since the current - * defaults are too high for Travis-CI. - */ - private abstract static class LimitNetworkBuffersTestEnvironment extends ExecutionEnvironment { - - public static void setAsContext() { - Configuration config = new Configuration(); - // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case - config.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB - final LocalEnvironment le = new LocalEnvironment(config); - - initializeContextEnvironment(new ExecutionEnvironmentFactory() { - @Override - public ExecutionEnvironment createExecutionEnvironment() { - return le; - } - }); - } - - public static void unsetAsContext() { - resetContextEnvironment(); - } - } - } diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java deleted file mode 100644 index 731cf31..0000000 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.runtime.utils.StreamITCase; -import org.apache.flink.table.sources.TableSource; -import org.apache.flink.types.Row; - -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143; -import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; - -/** - * IT case Test HBaseLookupFunction. - */ -public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter { - private static final String ROWKEY = "rk"; - private static final String FAMILY1 = "family1"; - private static final String F1COL1 = "col1"; - - private static final String FAMILY2 = "family2"; - private static final String F2COL1 = "col1"; - private static final String F2COL2 = "col2"; - - private static final String FAMILY3 = "family3"; - private static final String F3COL1 = "col1"; - private static final String F3COL2 = "col2"; - private static final String F3COL3 = "col3"; - - private static final String HTABLE_NAME = "testSrcHBaseTable1"; - - // prepare a source collection. - private static final List<Row> testData1 = new ArrayList<>(); - - static { - testData1.add(Row.of(1, 1L, "Hi")); - testData1.add(Row.of(2, 2L, "Hello")); - testData1.add(Row.of(3, 2L, "Hello world")); - testData1.add(Row.of(3, 3L, "Hello world!")); - } - - private static final TypeInformation<?>[] testTypes1 = {BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}; - private static final String[] testColumns1 = {"a", "b", "c"}; - private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(testTypes1, testColumns1); - - @BeforeClass - public static void activateHBaseCluster() throws IOException { - registerHBaseMiniClusterInClasspath(); - prepareHBaseTableWithData(); - } - - private static void prepareHBaseTableWithData() throws IOException { - // create a table - TableName tableName = TableName.valueOf(HTABLE_NAME); - // column families - byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)}; - // split keys - byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)}; - createTable(tableName, families, splitKeys); - - // get the HTable instance - HTable table = openTable(tableName); - List<Put> puts = new ArrayList<>(); - // add some data - puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1")); - puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2")); - puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3")); - puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4")); - puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5")); - puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6")); - puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7")); - puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8")); - - // append rows to table - table.put(puts); - table.close(); - } - - private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) { - Put put = new Put(Bytes.toBytes(rowKey)); - // family 1 - put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1)); - // family 2 - if (f2c1 != null) { - put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1)); - } - put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2)); - // family 3 - put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1)); - put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2)); - put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3)); - - return put; - } - - private static Map<String, String> hbaseTableProperties() { - Map<String, String> properties = new HashMap<>(); - properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE); - properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143); - properties.put(CONNECTOR_PROPERTY_VERSION, "1"); - properties.put(CONNECTOR_TABLE_NAME, HTABLE_NAME); - // get zk quorum from "hbase-site.xml" in classpath - String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM); - properties.put(CONNECTOR_ZK_QUORUM, hbaseZk); - // schema - String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3}; - TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT); - TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG); - TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING); - TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3}; - - DescriptorProperties descriptorProperties = new DescriptorProperties(true); - TableSchema tableSchema = new TableSchema(columnNames, columnTypes); - descriptorProperties.putTableSchema(SCHEMA, tableSchema); - descriptorProperties.putProperties(properties); - return descriptorProperties.asMap(); - } - - @Test - public void testHBaseLookupFunction() throws Exception { - StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - streamEnv.setParallelism(4); - StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv); - StreamITCase.clear(); - - // prepare a source table - String srcTableName = "testStreamSrcTable1"; - DataStream<Row> ds = streamEnv.fromCollection(testData1).returns(testTypeInfo1); - Table in = streamTableEnv.fromDataStream(ds, String.join(",", testColumns1)); - streamTableEnv.registerTable(srcTableName, in); - - Map<String, String> tableProperties = hbaseTableProperties(); - TableSource source = TableFactoryService - .find(HBaseTableFactory.class, tableProperties) - .createTableSource(tableProperties); - - streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY})); - - // perform a temporal table join query - String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM testStreamSrcTable1, LATERAL TABLE(hbaseLookup(a))"; - Table result = streamTableEnv.sqlQuery(sqlQuery); - - DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class); - resultSet.addSink(new StreamITCase.StringSink<>()); - - streamEnv.execute(); - - List<String> expected = new ArrayList<>(); - expected.add("1,10,Welt-1"); - expected.add("2,20,Welt-2"); - expected.add("3,30,Welt-3"); - expected.add("3,30,Welt-3"); - - StreamITCase.compareWithList(expected); - } -} diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java deleted file mode 100644 index ef1445d..0000000 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.addons.hbase; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.runtime.utils.StreamITCase; -import org.apache.flink.table.sinks.TableSink; -import org.apache.flink.test.util.TestBaseUtils; -import org.apache.flink.types.Row; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.table.descriptors.Schema.SCHEMA; - -/** - * IT case Test for {@link HBaseUpsertTableSink}. - */ -public class HBaseSinkITCase extends HBaseTestingClusterAutostarter { - private static final long serialVersionUID = 1L; - - private static final String TEST_TABLE = "testTable"; - - private static final String FAMILY1 = "family1"; - private static final String F1COL1 = "col1"; - - private static final String FAMILY2 = "family2"; - private static final String F2COL1 = "col1"; - private static final String F2COL2 = "col2"; - - private static final String FAMILY3 = "family3"; - private static final String F3COL1 = "col1"; - private static final String F3COL2 = "col2"; - private static final String F3COL3 = "col3"; - - // prepare a source collection. - private static final List<Row> testData1 = new ArrayList<>(); - private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo( - new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING}, - new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"}); - - static { - testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1")); - testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2")); - testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3")); - testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4")); - testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5")); - testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6")); - testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7")); - testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8")); - } - - @BeforeClass - public static void activateHBaseCluster() throws IOException { - registerHBaseMiniClusterInClasspath(); - createHBaseTable(); - } - - private static void createHBaseTable() { - // create a table - TableName tableName = TableName.valueOf(TEST_TABLE); - // column families - byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)}; - // split keys - byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)}; - createTable(tableName, families, splitKeys); - } - - @Test - public void testTableSink() throws Exception { - HBaseTableSchema schema = new HBaseTableSchema(); - schema.addColumn(FAMILY1, F1COL1, Integer.class); - schema.addColumn(FAMILY2, F2COL1, String.class); - schema.addColumn(FAMILY2, F2COL2, Long.class); - schema.setRowKey("rk", Integer.class); - schema.addColumn(FAMILY3, F3COL1, Double.class); - schema.addColumn(FAMILY3, F3COL2, Boolean.class); - schema.addColumn(FAMILY3, F3COL3, String.class); - - Map<String, String> tableProperties = new HashMap<>(); - tableProperties.put("connector.type", "hbase"); - tableProperties.put("connector.version", "1.4.3"); - tableProperties.put("connector.property-version", "1"); - tableProperties.put("connector.table-name", TEST_TABLE); - tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum()); - tableProperties.put("connector.zookeeper.znode.parent", "/hbase"); - DescriptorProperties descriptorProperties = new DescriptorProperties(true); - descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema()); - descriptorProperties.putProperties(tableProperties); - TableSink tableSink = TableFactoryService - .find(HBaseTableFactory.class, descriptorProperties.asMap()) - .createTableSink(descriptorProperties.asMap()); - - StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.setParallelism(4); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv); - StreamITCase.clear(); - - DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1); - tEnv.registerDataStream("src", ds); - tEnv.registerTableSink("hbase", tableSink); - - String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src"; - tEnv.sqlUpdate(query); - - // wait to finish - tEnv.execute("HBase Job"); - - // start a batch scan job to verify contents in HBase table - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig()); - - HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE); - hbaseTable.setRowKey("rowkey", Integer.class); - hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class); - hbaseTable.addColumn(FAMILY2, F2COL1, String.class); - hbaseTable.addColumn(FAMILY2, F2COL2, Long.class); - hbaseTable.addColumn(FAMILY3, F3COL1, Double.class); - hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class); - hbaseTable.addColumn(FAMILY3, F3COL3, String.class); - tableEnv.registerTableSource("hTable", hbaseTable); - - Table result = tableEnv.sqlQuery( - "SELECT " + - " h.rowkey, " + - " h.family1.col1, " + - " h.family2.col1, " + - " h.family2.col2, " + - " h.family3.col1, " + - " h.family3.col2, " + - " h.family3.col3 " + - "FROM hTable AS h" - ); - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - - String expected = - "1,10,Hello-1,100,1.01,false,Welt-1\n" + - "2,20,Hello-2,200,2.02,true,Welt-2\n" + - "3,30,Hello-3,300,3.03,false,Welt-3\n" + - "4,40,,400,4.04,true,Welt-4\n" + - "5,50,Hello-5,500,5.05,false,Welt-5\n" + - "6,60,Hello-6,600,6.06,true,Welt-6\n" + - "7,70,Hello-7,700,7.07,false,Welt-7\n" + - "8,80,,800,8.08,true,Welt-8\n"; - - TestBaseUtils.compareResultAsText(results, expected); - } -} diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java new file mode 100644 index 0000000..32baf0c --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.util; + +import org.apache.flink.table.api.EnvironmentSettings; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.addons.hbase.util.PlannerType.BLINK_PLANNER; +import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER; + +/** + * Abstract IT case class for HBase. + */ +@RunWith(Parameterized.class) +public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter { + + protected static final String TEST_TABLE_1 = "testTable1"; + protected static final String TEST_TABLE_2 = "testTable2"; + + protected static final String ROWKEY = "rk"; + protected static final String FAMILY1 = "family1"; + protected static final String F1COL1 = "col1"; + + protected static final String FAMILY2 = "family2"; + protected static final String F2COL1 = "col1"; + protected static final String F2COL2 = "col2"; + + protected static final String FAMILY3 = "family3"; + protected static final String F3COL1 = "col1"; + protected static final String F3COL2 = "col2"; + protected static final String F3COL3 = "col3"; + + private static final byte[][] FAMILIES = new byte[][]{ + Bytes.toBytes(FAMILY1), + Bytes.toBytes(FAMILY2), + Bytes.toBytes(FAMILY3) + }; + + private static final byte[][] SPLIT_KEYS = new byte[][]{Bytes.toBytes(4)}; + + @Parameterized.Parameter + public PlannerType planner; + protected EnvironmentSettings streamSettings; + protected EnvironmentSettings batchSettings; + + @Parameterized.Parameters(name = "planner = {0}") + public static PlannerType[] getPlanner() { + return new PlannerType[]{BLINK_PLANNER, OLD_PLANNER}; + } + + @BeforeClass + public static void activateHBaseCluster() throws IOException { + registerHBaseMiniClusterInClasspath(); + prepareTables(); + } + + @Before + public void before() { + EnvironmentSettings.Builder streamBuilder = EnvironmentSettings.newInstance().inStreamingMode(); + EnvironmentSettings.Builder batchBuilder = EnvironmentSettings.newInstance().inBatchMode(); + if (BLINK_PLANNER.equals(planner)) { + this.streamSettings = streamBuilder.useBlinkPlanner().build(); + this.batchSettings = batchBuilder.useBlinkPlanner().build(); + } else if (OLD_PLANNER.equals(planner)) { + this.streamSettings = streamBuilder.useOldPlanner().build(); + this.batchSettings = batchBuilder.useOldPlanner().build(); + } else { + throw new IllegalArgumentException("Unsupported planner name " + planner); + } + } + + private static void prepareTables() throws IOException { + createHBaseTable1(); + createHBaseTable2(); + } + + private static void createHBaseTable1() throws IOException { + // create a table + TableName tableName = TableName.valueOf(TEST_TABLE_1); + createTable(tableName, FAMILIES, SPLIT_KEYS); + + // get the HTable instance + HTable table = openTable(tableName); + List<Put> puts = new ArrayList<>(); + // add some data + puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1")); + puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2")); + puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3")); + puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4")); + puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5")); + puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6")); + puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7")); + puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8")); + + // append rows to table + table.put(puts); + table.close(); + } + + private static void createHBaseTable2() { + // create a table + TableName tableName = TableName.valueOf(TEST_TABLE_2); + createTable(tableName, FAMILIES, SPLIT_KEYS); + } + + private static Put putRow(int rowKey, int f1c1, String f2c1, long f2c2, double f3c1, boolean f3c2, String f3c3) { + Put put = new Put(Bytes.toBytes(rowKey)); + // family 1 + put.addColumn(Bytes.toBytes(FAMILY1), Bytes.toBytes(F1COL1), Bytes.toBytes(f1c1)); + // family 2 + if (f2c1 != null) { + put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL1), Bytes.toBytes(f2c1)); + } + put.addColumn(Bytes.toBytes(FAMILY2), Bytes.toBytes(F2COL2), Bytes.toBytes(f2c2)); + // family 3 + put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL1), Bytes.toBytes(f3c1)); + put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL2), Bytes.toBytes(f3c2)); + put.addColumn(Bytes.toBytes(FAMILY3), Bytes.toBytes(F3COL3), Bytes.toBytes(f3c3)); + + return put; + } +} diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java similarity index 97% rename from flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java rename to flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java index 905f80d..b515c2b 100644 --- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestingClusterAutoStarter.java @@ -18,9 +18,9 @@ * limitations under the License. */ -package org.apache.flink.addons.hbase; +package org.apache.flink.addons.hbase.util; -import org.apache.flink.util.TestLogger; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,7 +42,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.MalformedURLException; @@ -74,9 +73,9 @@ import static org.junit.Assert.fail; // // https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java // -public class HBaseTestingClusterAutostarter extends TestLogger implements Serializable { +public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase { - private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutostarter.class); + private static final Log LOG = LogFactory.getLog(HBaseTestingClusterAutoStarter.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static HBaseAdmin admin = null; @@ -160,7 +159,7 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial /** * Returns zookeeper quorum value contains the right port number (varies per run). */ - static String getZookeeperQuorum() { + protected static String getZookeeperQuorum() { return "localhost:" + TEST_UTIL.getZkCluster().getClientPort(); } diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java new file mode 100644 index 0000000..cd1bf39 --- /dev/null +++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/PlannerType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.util; + +/** + * Planner type to use. + */ +public enum PlannerType { + BLINK_PLANNER, + OLD_PLANNER +}
