This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8fc79e674e4596be16db264b517025c26ccefcb3 Author: Jark Wu <[email protected]> AuthorDate: Mon May 18 17:48:08 2020 +0800 [FLINK-17797][connector/hbase] Align the behavior between the new and legacy HBase table source This closes #12221 --- flink-connectors/flink-connector-hbase/pom.xml | 8 ++ .../connector/hbase/HBaseDynamicTableFactory.java | 43 ++++++- .../hbase/source/HBaseDynamicTableSource.java | 21 +++- .../flink/connector/hbase/util/HBaseSerde.java | 24 ++-- .../connector/hbase/util/HBaseTableSchema.java | 12 -- .../flink/connector/hbase/HBaseTablePlanTest.java | 127 +++++++++++++++++++++ .../flink/connector/hbase/HBaseTablePlanTest.xml | 36 ++++++ 7 files changed, 242 insertions(+), 29 deletions(-) diff --git a/flink-connectors/flink-connector-hbase/pom.xml b/flink-connectors/flink-connector-hbase/pom.xml index 90b16db..8e7a5a1 100644 --- a/flink-connectors/flink-connector-hbase/pom.xml +++ b/flink-connectors/flink-connector-hbase/pom.xml @@ -207,6 +207,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java index ba85577..64b381f 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java @@ -103,15 +103,15 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna public DynamicTableSource createDynamicTableSource(Context context) { TableFactoryHelper helper = createTableFactoryHelper(this, context); helper.validate(); + TableSchema tableSchema = context.getCatalogTable().getSchema(); + validatePrimaryKey(tableSchema); + String hTableName = helper.getOptions().get(TABLE_NAME); // create default configuration from current runtime env (`hbase-site.xml` in classpath) first, Configuration hbaseClientConf = HBaseConfiguration.create(); hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, helper.getOptions().get(ZOOKEEPER_QUORUM)); hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT)); - String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL); - - TableSchema tableSchema = context.getCatalogTable().getSchema(); HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); return new HBaseDynamicTableSource( @@ -125,6 +125,9 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna public DynamicTableSink createDynamicTableSink(Context context) { TableFactoryHelper helper = createTableFactoryHelper(this, context); helper.validate(); + TableSchema tableSchema = context.getCatalogTable().getSchema(); + validatePrimaryKey(tableSchema); + HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder(); hbaseOptionsBuilder.setTableName(helper.getOptions().get(TABLE_NAME)); hbaseOptionsBuilder.setZkQuorum(helper.getOptions().get(ZOOKEEPER_QUORUM)); @@ -136,10 +139,7 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna .ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis())); helper.getOptions().getOptional(SINK_BUFFER_FLUSH_MAX_ROWS) .ifPresent(writeBuilder::setBufferFlushMaxRows); - String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL); - - TableSchema tableSchema = context.getCatalogTable().getSchema(); HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); return new HBaseDynamicTableSink( @@ -172,4 +172,35 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna set.add(SINK_BUFFER_FLUSH_INTERVAL); return set; } + + // ------------------------------------------------------------------------------------------ + + /** + * Checks that the HBase table have row key defined. A row key is defined as an atomic type, + * and column families and qualifiers are defined as ROW type. There shouldn't be multiple + * atomic type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the + * primary key constraint must be defined on the single row key field. + */ + private static void validatePrimaryKey(TableSchema schema) { + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema); + if (!hbaseSchema.getRowKeyName().isPresent()) { + throw new IllegalArgumentException( + "HBase table requires to define a row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."); + } + schema.getPrimaryKey().ifPresent(k -> { + if (k.getColumns().size() > 1) { + throw new IllegalArgumentException( + "HBase table doesn't support a primary Key on multiple columns. " + + "The primary key of HBase table must be defined on row key field."); + } + if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) { + throw new IllegalArgumentException( + "Primary key of HBase table must be defined on the row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."); + } + }); + } } diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java index e59a12e..dcc5a5b 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java @@ -21,12 +21,15 @@ package org.apache.flink.connector.hbase.source; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connector.hbase.util.HBaseTableSchema; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.hadoop.conf.Configuration; @@ -36,11 +39,11 @@ import static org.apache.flink.util.Preconditions.checkArgument; * HBase table source implementation. */ @Internal -public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource { +public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown { private final Configuration conf; private final String tableName; - private final HBaseTableSchema hbaseSchema; + private HBaseTableSchema hbaseSchema; private final String nullStringLiteral; public HBaseDynamicTableSource( @@ -78,6 +81,20 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour } @Override + public boolean supportsNestedProjection() { + // planner doesn't support nested projection push down yet. + return false; + } + + @Override + public void applyProjection(int[][] projectedFields) { + TableSchema projectSchema = TableSchemaUtils.projectSchema( + hbaseSchema.convertsToTableSchema(), + projectedFields); + this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema); + } + + @Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java index ed4a11f..e5a377f 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java @@ -67,8 +67,8 @@ public class HBaseSerde { private GenericRowData reusedRow; private GenericRowData[] reusedFamilyRows; - private final FieldEncoder keyEncoder; - private final FieldDecoder keyDecoder; + private final @Nullable FieldEncoder keyEncoder; + private final @Nullable FieldDecoder keyDecoder; private final FieldEncoder[][] qualifierEncoders; private final FieldDecoder[][] qualifierDecoders; @@ -78,18 +78,21 @@ public class HBaseSerde { LogicalType rowkeyType = hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null); // field length need take row key into account if it exists. - checkArgument(rowkeyIndex != -1 && rowkeyType != null, "row key is not set."); - this.fieldLength = families.length + 1; + if (rowkeyIndex != -1 && rowkeyType != null) { + this.fieldLength = families.length + 1; + this.keyEncoder = createFieldEncoder(rowkeyType); + this.keyDecoder = createFieldDecoder(rowkeyType); + } else { + this.fieldLength = families.length; + this.keyEncoder = null; + this.keyDecoder = null; + } this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8); // prepare output rows this.reusedRow = new GenericRowData(fieldLength); this.reusedFamilyRows = new GenericRowData[families.length]; - // row key should never be null - this.keyEncoder = createFieldEncoder(rowkeyType); - this.keyDecoder = createFieldDecoder(rowkeyType); - this.qualifiers = new byte[families.length][][]; this.qualifierEncoders = new FieldEncoder[families.length][]; this.qualifierDecoders = new FieldDecoder[families.length][]; @@ -115,6 +118,7 @@ public class HBaseSerde { * @return The appropriate instance of Put for this use case. */ public @Nullable Put createPutMutation(RowData row) { + checkArgument(keyEncoder != null, "row key is not set."); byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); if (rowkey.length == 0) { // drop dirty records, rowkey shouldn't be zero length @@ -146,6 +150,7 @@ public class HBaseSerde { * @return The appropriate instance of Delete for this use case. */ public @Nullable Delete createDeleteMutation(RowData row) { + checkArgument(keyEncoder != null, "row key is not set."); byte[] rowkey = keyEncoder.encode(row, rowkeyIndex); if (rowkey.length == 0) { // drop dirty records, rowkey shouldn't be zero length @@ -189,9 +194,10 @@ public class HBaseSerde { * Converts HBase {@link Result} into {@link RowData}. */ public RowData convertToRow(Result result) { - Object rowkey = keyDecoder.decode(result.getRow()); for (int i = 0; i < fieldLength; i++) { if (rowkeyIndex == i) { + assert keyDecoder != null; + Object rowkey = keyDecoder.decode(result.getRow()); reusedRow.setField(rowkeyIndex, rowkey); } else { int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i; diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java index 41108f4..116b1ae 100644 --- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java +++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java @@ -361,18 +361,6 @@ public class HBaseTableSchema implements Serializable { "Unsupported field type '" + fieldType + "' for HBase."); } } - schema.getPrimaryKey().ifPresent(k -> { - if (k.getColumns().size() > 1 || - !hbaseSchema.getRowKeyName().isPresent() || - !hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) { - throw new IllegalArgumentException( - "Primary Key of HBase table should only be defined on the row key field."); - } - }); - if (!hbaseSchema.getRowKeyName().isPresent()) { - throw new IllegalArgumentException( - "HBase table requires to define a row key field. A row key field must be an atomic type."); - } return hbaseSchema; } diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java new file mode 100644 index 0000000..053cf99 --- /dev/null +++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.hbase; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Test; + +import static org.apache.flink.util.CoreMatchers.containsCause; + +/** + * Plan tests for HBase connector, for example, testing projection push down. + */ +public class HBaseTablePlanTest extends TableTestBase { + + private final StreamTableTestUtil util = streamTestUtil(new TableConfig()); + + @Test + public void testMultipleRowKey() { + util.tableEnv().executeSql( + "CREATE TABLE hTable (" + + " family1 ROW<col1 INT>," + + " family2 ROW<col1 STRING, col2 BIGINT>," + + " rowkey INT," + + " rowkey2 STRING " + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = 'my_table'," + + " 'zookeeper.quorum' = 'localhost:2021'" + + ")"); + thrown().expect(containsCause(new IllegalArgumentException("Row key can't be set multiple times."))); + util.verifyPlan("SELECT * FROM hTable"); + } + + @Test + public void testNoneRowKey() { + util.tableEnv().executeSql( + "CREATE TABLE hTable (" + + " family1 ROW<col1 INT>," + + " family2 ROW<col1 STRING, col2 BIGINT>" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = 'my_table'," + + " 'zookeeper.quorum' = 'localhost:2021'" + + ")"); + thrown().expect(containsCause(new IllegalArgumentException( + "HBase table requires to define a row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."))); + util.verifyPlan("SELECT * FROM hTable"); + } + + @Test + public void testInvalidPrimaryKey() { + util.tableEnv().executeSql( + "CREATE TABLE hTable (" + + " family1 ROW<col1 INT>," + + " family2 ROW<col1 STRING, col2 BIGINT>," + + " rowkey STRING, " + + " PRIMARY KEY (family1) NOT ENFORCED " + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = 'my_table'," + + " 'zookeeper.quorum' = 'localhost:2021'" + + ")"); + thrown().expect(containsCause(new IllegalArgumentException( + "Primary key of HBase table must be defined on the row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."))); + util.verifyPlan("SELECT * FROM hTable"); + } + + @Test + public void testUnsupportedDataType() { + util.tableEnv().executeSql( + "CREATE TABLE hTable (" + + " family1 ROW<col1 INT>," + + " family2 ROW<col1 STRING, col2 BIGINT>," + + " col1 ARRAY<STRING>, " + + " rowkey STRING, " + + " PRIMARY KEY (rowkey) NOT ENFORCED " + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = 'my_table'," + + " 'zookeeper.quorum' = 'localhost:2021'" + + ")"); + thrown().expect(containsCause(new IllegalArgumentException( + "Unsupported field type 'ARRAY<STRING>' for HBase."))); + util.verifyPlan("SELECT * FROM hTable"); + } + + @Test + public void testProjectionPushDown() { + util.tableEnv().executeSql( + "CREATE TABLE hTable (" + + " family1 ROW<col1 INT>," + + " family2 ROW<col1 STRING, col2 BIGINT>," + + " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>," + + " rowkey INT," + + " PRIMARY KEY (rowkey) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'hbase-1.4'," + + " 'table-name' = 'my_table'," + + " 'zookeeper.quorum' = 'localhost:2021'" + + ")"); + util.verifyPlan("SELECT h.family3, h.family2.col2 FROM hTable AS h"); + } + +} diff --git a/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml new file mode 100644 index 0000000..8391b1b --- /dev/null +++ b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testProjectionPushDown"> + <Resource name="sql"> + <![CDATA[SELECT h.family3, h.family2.col2 FROM hTable AS h]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(family3=[$2], col2=[$1.col2]) ++- LogicalTableScan(table=[[default_catalog, default_database, hTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[family3, family2.col2 AS col2]) ++- TableSourceScan(table=[[default_catalog, default_database, hTable, project=[family3, family2]]], fields=[family3, family2]) +]]> + </Resource> + </TestCase> +</Root>
