This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8fc79e674e4596be16db264b517025c26ccefcb3
Author: Jark Wu <[email protected]>
AuthorDate: Mon May 18 17:48:08 2020 +0800

    [FLINK-17797][connector/hbase] Align the behavior between the new and 
legacy HBase table source
    
    This closes #12221
---
 flink-connectors/flink-connector-hbase/pom.xml     |   8 ++
 .../connector/hbase/HBaseDynamicTableFactory.java  |  43 ++++++-
 .../hbase/source/HBaseDynamicTableSource.java      |  21 +++-
 .../flink/connector/hbase/util/HBaseSerde.java     |  24 ++--
 .../connector/hbase/util/HBaseTableSchema.java     |  12 --
 .../flink/connector/hbase/HBaseTablePlanTest.java  | 127 +++++++++++++++++++++
 .../flink/connector/hbase/HBaseTablePlanTest.xml   |  36 ++++++
 7 files changed, 242 insertions(+), 29 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase/pom.xml 
b/flink-connectors/flink-connector-hbase/pom.xml
index 90b16db..8e7a5a1 100644
--- a/flink-connectors/flink-connector-hbase/pom.xml
+++ b/flink-connectors/flink-connector-hbase/pom.xml
@@ -207,6 +207,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-clients_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index ba85577..64b381f 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -103,15 +103,15 @@ public class HBaseDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
        public DynamicTableSource createDynamicTableSource(Context context) {
                TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
                helper.validate();
+               TableSchema tableSchema = context.getCatalogTable().getSchema();
+               validatePrimaryKey(tableSchema);
+
                String hTableName = helper.getOptions().get(TABLE_NAME);
                // create default configuration from current runtime env 
(`hbase-site.xml` in classpath) first,
                Configuration hbaseClientConf = HBaseConfiguration.create();
                hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
helper.getOptions().get(ZOOKEEPER_QUORUM));
                hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, 
helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
-
                String nullStringLiteral = 
helper.getOptions().get(NULL_STRING_LITERAL);
-
-               TableSchema tableSchema = context.getCatalogTable().getSchema();
                HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
 
                return new HBaseDynamicTableSource(
@@ -125,6 +125,9 @@ public class HBaseDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
        public DynamicTableSink createDynamicTableSink(Context context) {
                TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
                helper.validate();
+               TableSchema tableSchema = context.getCatalogTable().getSchema();
+               validatePrimaryKey(tableSchema);
+
                HBaseOptions.Builder hbaseOptionsBuilder = 
HBaseOptions.builder();
                
hbaseOptionsBuilder.setTableName(helper.getOptions().get(TABLE_NAME));
                
hbaseOptionsBuilder.setZkQuorum(helper.getOptions().get(ZOOKEEPER_QUORUM));
@@ -136,10 +139,7 @@ public class HBaseDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                        .ifPresent(v -> 
writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
                helper.getOptions().getOptional(SINK_BUFFER_FLUSH_MAX_ROWS)
                        .ifPresent(writeBuilder::setBufferFlushMaxRows);
-
                String nullStringLiteral = 
helper.getOptions().get(NULL_STRING_LITERAL);
-
-               TableSchema tableSchema = context.getCatalogTable().getSchema();
                HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
 
                return new HBaseDynamicTableSink(
@@ -172,4 +172,35 @@ public class HBaseDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                set.add(SINK_BUFFER_FLUSH_INTERVAL);
                return set;
        }
+
+       // 
------------------------------------------------------------------------------------------
+
+       /**
+        * Checks that the HBase table have row key defined. A row key is 
defined as an atomic type,
+        * and column families and qualifiers are defined as ROW type. There 
shouldn't be multiple
+        * atomic type columns in the schema. The PRIMARY KEY constraint is 
optional, if exist, the
+        * primary key constraint must be defined on the single row key field.
+        */
+       private static void validatePrimaryKey(TableSchema schema) {
+               HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(schema);
+               if (!hbaseSchema.getRowKeyName().isPresent()) {
+                       throw new IllegalArgumentException(
+                               "HBase table requires to define a row key 
field. " +
+                                       "A row key field is defined as an 
atomic type, " +
+                                       "column families and qualifiers are 
defined as ROW type.");
+               }
+               schema.getPrimaryKey().ifPresent(k -> {
+                       if (k.getColumns().size() > 1) {
+                               throw new IllegalArgumentException(
+                                       "HBase table doesn't support a primary 
Key on multiple columns. " +
+                                               "The primary key of HBase table 
must be defined on row key field.");
+                       }
+                       if 
(!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
+                               throw new IllegalArgumentException(
+                                       "Primary key of HBase table must be 
defined on the row key field. " +
+                                               "A row key field is defined as 
an atomic type, " +
+                                               "column families and qualifiers 
are defined as ROW type.");
+                       }
+               });
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index e59a12e..dcc5a5b 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -21,12 +21,15 @@ package org.apache.flink.connector.hbase.source;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.utils.TableSchemaUtils;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -36,11 +39,11 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * HBase table source implementation.
  */
 @Internal
-public class HBaseDynamicTableSource implements ScanTableSource, 
LookupTableSource {
+public class HBaseDynamicTableSource implements ScanTableSource, 
LookupTableSource, SupportsProjectionPushDown {
 
        private final Configuration conf;
        private final String tableName;
-       private final HBaseTableSchema hbaseSchema;
+       private HBaseTableSchema hbaseSchema;
        private final String nullStringLiteral;
 
        public HBaseDynamicTableSource(
@@ -78,6 +81,20 @@ public class HBaseDynamicTableSource implements 
ScanTableSource, LookupTableSour
        }
 
        @Override
+       public boolean supportsNestedProjection() {
+               // planner doesn't support nested projection push down yet.
+               return false;
+       }
+
+       @Override
+       public void applyProjection(int[][] projectedFields) {
+               TableSchema projectSchema = TableSchemaUtils.projectSchema(
+                       hbaseSchema.convertsToTableSchema(),
+                       projectedFields);
+               this.hbaseSchema = 
HBaseTableSchema.fromTableSchema(projectSchema);
+       }
+
+       @Override
        public ChangelogMode getChangelogMode() {
                return ChangelogMode.insertOnly();
        }
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index ed4a11f..e5a377f 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -67,8 +67,8 @@ public class HBaseSerde {
        private GenericRowData reusedRow;
        private GenericRowData[] reusedFamilyRows;
 
-       private final FieldEncoder keyEncoder;
-       private final FieldDecoder keyDecoder;
+       private final @Nullable FieldEncoder keyEncoder;
+       private final @Nullable FieldDecoder keyDecoder;
        private final FieldEncoder[][] qualifierEncoders;
        private final FieldDecoder[][] qualifierDecoders;
 
@@ -78,18 +78,21 @@ public class HBaseSerde {
                LogicalType rowkeyType = 
hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);
 
                // field length need take row key into account if it exists.
-               checkArgument(rowkeyIndex != -1 && rowkeyType != null, "row key 
is not set.");
-               this.fieldLength = families.length + 1;
+               if (rowkeyIndex != -1 && rowkeyType != null) {
+                       this.fieldLength = families.length + 1;
+                       this.keyEncoder = createFieldEncoder(rowkeyType);
+                       this.keyDecoder = createFieldDecoder(rowkeyType);
+               } else {
+                       this.fieldLength = families.length;
+                       this.keyEncoder = null;
+                       this.keyDecoder = null;
+               }
                this.nullStringBytes = 
nullStringLiteral.getBytes(StandardCharsets.UTF_8);
 
                // prepare output rows
                this.reusedRow = new GenericRowData(fieldLength);
                this.reusedFamilyRows = new GenericRowData[families.length];
 
-               // row key should never be null
-               this.keyEncoder = createFieldEncoder(rowkeyType);
-               this.keyDecoder = createFieldDecoder(rowkeyType);
-
                this.qualifiers = new byte[families.length][][];
                this.qualifierEncoders = new FieldEncoder[families.length][];
                this.qualifierDecoders = new FieldDecoder[families.length][];
@@ -115,6 +118,7 @@ public class HBaseSerde {
         * @return The appropriate instance of Put for this use case.
         */
        public @Nullable Put createPutMutation(RowData row) {
+               checkArgument(keyEncoder != null, "row key is not set.");
                byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
                if (rowkey.length == 0) {
                        // drop dirty records, rowkey shouldn't be zero length
@@ -146,6 +150,7 @@ public class HBaseSerde {
         * @return The appropriate instance of Delete for this use case.
         */
        public @Nullable Delete createDeleteMutation(RowData row) {
+               checkArgument(keyEncoder != null, "row key is not set.");
                byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
                if (rowkey.length == 0) {
                        // drop dirty records, rowkey shouldn't be zero length
@@ -189,9 +194,10 @@ public class HBaseSerde {
         * Converts HBase {@link Result} into {@link RowData}.
         */
        public RowData convertToRow(Result result) {
-               Object rowkey = keyDecoder.decode(result.getRow());
                for (int i = 0; i < fieldLength; i++) {
                        if (rowkeyIndex == i) {
+                               assert keyDecoder != null;
+                               Object rowkey = 
keyDecoder.decode(result.getRow());
                                reusedRow.setField(rowkeyIndex, rowkey);
                        } else {
                                int f = (rowkeyIndex != -1 && i > rowkeyIndex) 
? i - 1 : i;
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
index 41108f4..116b1ae 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
@@ -361,18 +361,6 @@ public class HBaseTableSchema implements Serializable {
                                        "Unsupported field type '" + fieldType 
+ "' for HBase.");
                        }
                }
-               schema.getPrimaryKey().ifPresent(k -> {
-                       if (k.getColumns().size() > 1 ||
-                                       
!hbaseSchema.getRowKeyName().isPresent() ||
-                                       
!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
-                               throw new IllegalArgumentException(
-                                       "Primary Key of HBase table should only 
be defined on the row key field.");
-                       }
-               });
-               if (!hbaseSchema.getRowKeyName().isPresent()) {
-                       throw new IllegalArgumentException(
-                               "HBase table requires to define a row key 
field. A row key field must be an atomic type.");
-               }
                return hbaseSchema;
        }
 
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
new file mode 100644
index 0000000..053cf99
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+
+/**
+ * Plan tests for HBase connector, for example, testing projection push down.
+ */
+public class HBaseTablePlanTest extends TableTestBase {
+
+       private final StreamTableTestUtil util = streamTestUtil(new 
TableConfig());
+
+       @Test
+       public void testMultipleRowKey() {
+               util.tableEnv().executeSql(
+                       "CREATE TABLE hTable (" +
+                               " family1 ROW<col1 INT>," +
+                               " family2 ROW<col1 STRING, col2 BIGINT>," +
+                               " rowkey INT," +
+                               " rowkey2 STRING " +
+                               ") WITH (" +
+                               " 'connector' = 'hbase-1.4'," +
+                               " 'table-name' = 'my_table'," +
+                               " 'zookeeper.quorum' = 'localhost:2021'" +
+                               ")");
+               thrown().expect(containsCause(new IllegalArgumentException("Row 
key can't be set multiple times.")));
+               util.verifyPlan("SELECT * FROM hTable");
+       }
+
+       @Test
+       public void testNoneRowKey() {
+               util.tableEnv().executeSql(
+                       "CREATE TABLE hTable (" +
+                               " family1 ROW<col1 INT>," +
+                               " family2 ROW<col1 STRING, col2 BIGINT>" +
+                               ") WITH (" +
+                               " 'connector' = 'hbase-1.4'," +
+                               " 'table-name' = 'my_table'," +
+                               " 'zookeeper.quorum' = 'localhost:2021'" +
+                               ")");
+               thrown().expect(containsCause(new IllegalArgumentException(
+                       "HBase table requires to define a row key field. " +
+                               "A row key field is defined as an atomic type, 
" +
+                               "column families and qualifiers are defined as 
ROW type.")));
+               util.verifyPlan("SELECT * FROM hTable");
+       }
+
+       @Test
+       public void testInvalidPrimaryKey() {
+               util.tableEnv().executeSql(
+                       "CREATE TABLE hTable (" +
+                               " family1 ROW<col1 INT>," +
+                               " family2 ROW<col1 STRING, col2 BIGINT>," +
+                               " rowkey STRING, " +
+                               " PRIMARY KEY (family1) NOT ENFORCED " +
+                               ") WITH (" +
+                               " 'connector' = 'hbase-1.4'," +
+                               " 'table-name' = 'my_table'," +
+                               " 'zookeeper.quorum' = 'localhost:2021'" +
+                               ")");
+               thrown().expect(containsCause(new IllegalArgumentException(
+                       "Primary key of HBase table must be defined on the row 
key field. " +
+                               "A row key field is defined as an atomic type, 
" +
+                               "column families and qualifiers are defined as 
ROW type.")));
+               util.verifyPlan("SELECT * FROM hTable");
+       }
+
+       @Test
+       public void testUnsupportedDataType() {
+               util.tableEnv().executeSql(
+                       "CREATE TABLE hTable (" +
+                               " family1 ROW<col1 INT>," +
+                               " family2 ROW<col1 STRING, col2 BIGINT>," +
+                               " col1 ARRAY<STRING>, " +
+                               " rowkey STRING, " +
+                               " PRIMARY KEY (rowkey) NOT ENFORCED " +
+                               ") WITH (" +
+                               " 'connector' = 'hbase-1.4'," +
+                               " 'table-name' = 'my_table'," +
+                               " 'zookeeper.quorum' = 'localhost:2021'" +
+                               ")");
+               thrown().expect(containsCause(new IllegalArgumentException(
+                       "Unsupported field type 'ARRAY<STRING>' for HBase.")));
+               util.verifyPlan("SELECT * FROM hTable");
+       }
+
+       @Test
+       public void testProjectionPushDown() {
+               util.tableEnv().executeSql(
+                       "CREATE TABLE hTable (" +
+                               " family1 ROW<col1 INT>," +
+                               " family2 ROW<col1 STRING, col2 BIGINT>," +
+                               " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 
STRING>," +
+                               " rowkey INT," +
+                               " PRIMARY KEY (rowkey) NOT ENFORCED" +
+                               ") WITH (" +
+                               " 'connector' = 'hbase-1.4'," +
+                               " 'table-name' = 'my_table'," +
+                               " 'zookeeper.quorum' = 'localhost:2021'" +
+                               ")");
+               util.verifyPlan("SELECT h.family3, h.family2.col2 FROM hTable 
AS h");
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
 
b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
new file mode 100644
index 0000000..8391b1b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT h.family3, h.family2.col2 FROM hTable AS h]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(family3=[$2], col2=[$1.col2])
++- LogicalTableScan(table=[[default_catalog, default_database, hTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[family3, family2.col2 AS col2])
++- TableSourceScan(table=[[default_catalog, default_database, hTable, 
project=[family3, family2]]], fields=[family3, family2])
+]]>
+    </Resource>
+  </TestCase>
+</Root>

Reply via email to