This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd209bd547a9db4d30817d61f3b80a9323451506
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Oct 12 13:24:39 2021 +0200

    [FLINK-24397][connectors/hbase] Remove TableSchema usage from Hbase table 
connector
---
 .../hbase1/HBase1DynamicTableFactory.java          | 13 ++--
 .../hbase2/HBase2DynamicTableFactory.java          | 13 ++--
 .../hbase2/source/HBaseDynamicTableSource.java     |  9 ++-
 .../HBaseRowDataAsyncLookupFunctionTest.java       | 35 +++++-----
 .../source/AbstractHBaseDynamicTableSource.java    | 18 ++---
 .../hbase/table/HBaseConnectorOptionsUtil.java     | 38 ++++++-----
 .../connector/hbase/util/HBaseTableSchema.java     | 76 ++++++----------------
 .../flink/connector/hbase/util/HBaseSerdeTest.java | 35 +++++-----
 8 files changed, 99 insertions(+), 138 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index 6f825fb..3454064 100644
--- 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -70,15 +69,15 @@ public class HBase1DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        TableSchema tableSchema = context.getCatalogTable().getSchema();
         Map<String, String> options = context.getCatalogTable().getOptions();
 
-        validatePrimaryKey(tableSchema);
+        validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
         Configuration hbaseClientConf = getHBaseConfiguration(options);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
-        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
+        HBaseTableSchema hbaseSchema =
+                
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
 
         return new HBaseDynamicTableSource(
                 hbaseClientConf,
@@ -95,16 +94,16 @@ public class HBase1DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        TableSchema tableSchema = context.getCatalogTable().getSchema();
         Map<String, String> options = context.getCatalogTable().getOptions();
 
-        validatePrimaryKey(tableSchema);
+        validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
         Configuration hbaseConf = getHBaseConfiguration(options);
         HBaseWriteOptions hBaseWriteOptions = 
getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
-        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
+        HBaseTableSchema hbaseSchema =
+                
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
 
         return new HBaseDynamicTableSink(
                 tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, 
nullStringLiteral);
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index 7ba42af..c9246b6 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -72,16 +71,16 @@ public class HBase2DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        TableSchema tableSchema = context.getCatalogTable().getSchema();
         Map<String, String> options = context.getCatalogTable().getOptions();
 
-        validatePrimaryKey(tableSchema);
+        validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
         Configuration hbaseConf = getHBaseConfiguration(options);
         HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
-        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
+        HBaseTableSchema hbaseSchema =
+                
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
 
         return new HBaseDynamicTableSource(
                 hbaseConf, tableName, hbaseSchema, nullStringLiteral, 
lookupOptions);
@@ -94,16 +93,16 @@ public class HBase2DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        TableSchema tableSchema = context.getCatalogTable().getSchema();
         Map<String, String> options = context.getCatalogTable().getOptions();
 
-        validatePrimaryKey(tableSchema);
+        validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
         Configuration hbaseConf = getHBaseConfiguration(options);
         HBaseWriteOptions hBaseWriteOptions = 
getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
-        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
+        HBaseTableSchema hbaseSchema =
+                
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
 
         return new HBaseDynamicTableSink(
                 tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, 
nullStringLiteral);
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
index d94b03d..806455c 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -56,11 +57,9 @@ public class HBaseDynamicTableSource extends 
AbstractHBaseDynamicTableSource {
                 hbaseSchema.getRowKeyName().isPresent(),
                 "HBase schema must have a row key when used in lookup mode.");
         checkArgument(
-                hbaseSchema
-                        .convertsToTableSchema()
-                        .getTableColumn(context.getKeys()[0][0])
-                        .filter(f -> 
f.getName().equals(hbaseSchema.getRowKeyName().get()))
-                        .isPresent(),
+                DataType.getFieldNames(hbaseSchema.convertToDataType())
+                        .get(context.getKeys()[0][0])
+                        .equals(hbaseSchema.getRowKeyName().get()),
                 "Currently, HBase table only supports lookup by rowkey 
field.");
         if (lookupOptions.getLookupAsync()) {
             return AsyncTableFunctionProvider.of(
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
index 1e7f522..2f8cba6 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
@@ -22,8 +22,8 @@ import 
org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase2.util.HBaseTestBase;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
@@ -38,6 +38,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -98,23 +104,18 @@ public class HBaseRowDataAsyncLookupFunctionTest extends 
HBaseTestBase {
             lookupOptions =
                     
HBaseLookupOptions.builder().setCacheMaxSize(4).setCacheExpireMs(10000).build();
         }
-        TableSchema schema =
-                TableSchema.builder()
-                        .field(ROW_KEY, DataTypes.INT())
-                        .field(FAMILY1, DataTypes.ROW(DataTypes.FIELD(F1COL1, 
DataTypes.INT())))
-                        .field(
-                                FAMILY2,
-                                DataTypes.ROW(
-                                        DataTypes.FIELD(F2COL1, 
DataTypes.STRING()),
-                                        DataTypes.FIELD(F2COL2, 
DataTypes.BIGINT())))
-                        .field(
+        DataType dataType =
+                ROW(
+                        FIELD(ROW_KEY, INT()),
+                        FIELD(FAMILY1, ROW(FIELD(F1COL1, INT()))),
+                        FIELD(FAMILY2, ROW(FIELD(F2COL1, STRING()), 
FIELD(F2COL2, BIGINT()))),
+                        FIELD(
                                 FAMILY3,
-                                DataTypes.ROW(
-                                        DataTypes.FIELD(F3COL1, 
DataTypes.DOUBLE()),
-                                        DataTypes.FIELD(F3COL2, 
DataTypes.BOOLEAN()),
-                                        DataTypes.FIELD(F3COL3, 
DataTypes.STRING())))
-                        .build();
-        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(schema);
+                                ROW(
+                                        FIELD(F3COL1, DOUBLE()),
+                                        FIELD(F3COL2, DataTypes.BOOLEAN()),
+                                        FIELD(F3COL3, STRING()))));
+        HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
         return new HBaseRowDataAsyncLookupFunction(
                 getConf(), TEST_TABLE_1, hbaseSchema, "null", lookupOptions);
     }
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
index 87a5bdb..e1806ef 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
@@ -31,7 +30,7 @@ import 
org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -77,11 +76,9 @@ public abstract class AbstractHBaseDynamicTableSource
                 hbaseSchema.getRowKeyName().isPresent(),
                 "HBase schema must have a row key when used in lookup mode.");
         checkArgument(
-                hbaseSchema
-                        .convertsToTableSchema()
-                        .getTableColumn(context.getKeys()[0][0])
-                        .filter(f -> 
f.getName().equals(hbaseSchema.getRowKeyName().get()))
-                        .isPresent(),
+                DataType.getFieldNames(hbaseSchema.convertToDataType())
+                        .get(context.getKeys()[0][0])
+                        .equals(hbaseSchema.getRowKeyName().get()),
                 "Currently, HBase table only supports lookup by rowkey 
field.");
 
         return TableFunctionProvider.of(
@@ -97,10 +94,9 @@ public abstract class AbstractHBaseDynamicTableSource
 
     @Override
     public void applyProjection(int[][] projectedFields) {
-        TableSchema projectSchema =
-                TableSchemaUtils.projectSchema(
-                        hbaseSchema.convertsToTableSchema(), projectedFields);
-        this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema);
+        this.hbaseSchema =
+                HBaseTableSchema.fromDataType(
+                        
DataType.projectFields(hbaseSchema.convertToDataType(), projectedFields));
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 9ec1c06..6b9804f 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -57,29 +57,31 @@ public class HBaseConnectorOptionsUtil {
      * type columns in the schema. The PRIMARY KEY constraint is optional, if 
exist, the primary key
      * constraint must be defined on the single row key field.
      */
-    public static void validatePrimaryKey(TableSchema schema) {
-        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(schema);
+    public static void validatePrimaryKey(DataType dataType, int[] 
primaryKeyIndexes) {
+        HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
         if (!hbaseSchema.getRowKeyName().isPresent()) {
             throw new IllegalArgumentException(
                     "HBase table requires to define a row key field. "
                             + "A row key field is defined as an atomic type, "
                             + "column families and qualifiers are defined as 
ROW type.");
         }
-        schema.getPrimaryKey()
-                .ifPresent(
-                        k -> {
-                            if (k.getColumns().size() > 1) {
-                                throw new IllegalArgumentException(
-                                        "HBase table doesn't support a primary 
Key on multiple columns. "
-                                                + "The primary key of HBase 
table must be defined on row key field.");
-                            }
-                            if 
(!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
-                                throw new IllegalArgumentException(
-                                        "Primary key of HBase table must be 
defined on the row key field. "
-                                                + "A row key field is defined 
as an atomic type, "
-                                                + "column families and 
qualifiers are defined as ROW type.");
-                            }
-                        });
+        if (primaryKeyIndexes.length == 0) {
+            return;
+        }
+        if (primaryKeyIndexes.length > 1) {
+            throw new IllegalArgumentException(
+                    "HBase table doesn't support a primary Key on multiple 
columns. "
+                            + "The primary key of HBase table must be defined 
on row key field.");
+        }
+        if (!hbaseSchema
+                .getRowKeyName()
+                .get()
+                
.equals(DataType.getFieldNames(dataType).get(primaryKeyIndexes[0]))) {
+            throw new IllegalArgumentException(
+                    "Primary key of HBase table must be defined on the row key 
field. "
+                            + "A row key field is defined as an atomic type, "
+                            + "column families and qualifiers are defined as 
ROW type.");
+        }
     }
 
     public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig 
tableOptions) {
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
index 81342f3..d43828a 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -30,6 +29,8 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Streams;
+
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.Arrays;
@@ -200,20 +201,6 @@ public class HBaseTableSchema implements Serializable {
         return qualifierKeys;
     }
 
-    /**
-     * Returns the types of all registered column qualifiers of a specific 
column family.
-     *
-     * @param family The name of the column family for which the column 
qualifier types are
-     *     returned.
-     * @return The types of all registered column qualifiers of a specific 
column family.
-     */
-    public TypeInformation<?>[] getQualifierTypes(String family) {
-        DataType[] dataTypes = getQualifierDataTypes(family);
-        return Arrays.stream(dataTypes)
-                .map(TypeConversions::fromDataTypeToLegacyInfo)
-                .toArray(TypeInformation[]::new);
-    }
-
     public DataType[] getQualifierDataTypes(String family) {
         Map<String, DataType> qualifierMap = familyMap.get(family);
 
@@ -241,15 +228,6 @@ public class HBaseTableSchema implements Serializable {
         return familyMap.get(family);
     }
 
-    /**
-     * Returns the charset for value strings and HBase identifiers.
-     *
-     * @return The charset for value strings and HBase identifiers.
-     */
-    public String getStringCharset() {
-        return this.charset;
-    }
-
     /** Returns field index of row key in the table schema. Returns -1 if row 
key is not set. */
     public int getRowKeyIndex() {
         return rowKeyInfo == null ? -1 : rowKeyInfo.rowKeyIndex;
@@ -274,39 +252,15 @@ public class HBaseTableSchema implements Serializable {
         return rowKeyInfo == null ? Optional.empty() : 
Optional.of(rowKeyInfo.rowKeyName);
     }
 
-    /** Gets a new hbase schema with the selected fields. */
-    public HBaseTableSchema getProjectedHBaseTableSchema(int[] 
projectedFields) {
-        if (projectedFields == null) {
-            return this;
-        }
-        HBaseTableSchema newSchema = new HBaseTableSchema();
-        String[] fieldNames = convertsToTableSchema().getFieldNames();
-        for (int projectedField : projectedFields) {
-            String name = fieldNames[projectedField];
-            if (rowKeyInfo != null && name.equals(rowKeyInfo.rowKeyName)) {
-                newSchema.setRowKey(rowKeyInfo.rowKeyName, 
rowKeyInfo.rowKeyType);
-            } else {
-                Map<String, DataType> familyInfo = getFamilyInfo(name);
-                for (Map.Entry<String, DataType> entry : 
familyInfo.entrySet()) {
-                    // create the newSchema
-                    String qualifier = entry.getKey();
-                    newSchema.addColumn(name, qualifier, entry.getValue());
-                }
-            }
-        }
-        newSchema.setCharset(charset);
-        return newSchema;
-    }
-
     /**
-     * Converts this {@link HBaseTableSchema} to {@link TableSchema}, the 
fields are consisted of
+     * Converts this {@link HBaseTableSchema} to {@link DataType}, the fields 
are consisted of
      * families and rowkey, the order is in the definition order (i.e. calling 
{@link
      * #addColumn(String, String, Class)} and {@link #setRowKey(String, 
Class)}). The family field
      * is a composite type which is consisted of qualifiers.
      *
-     * @return the {@link TableSchema} derived from the {@link 
HBaseTableSchema}.
+     * @return the {@link DataType} derived from the {@link HBaseTableSchema}.
      */
-    public TableSchema convertsToTableSchema() {
+    public DataType convertToDataType() {
         String[] familyNames = getFamilyNames();
         if (rowKeyInfo != null) {
             String[] fieldNames = new String[familyNames.length + 1];
@@ -324,7 +278,12 @@ public class HBaseTableSchema implements Serializable {
                                     getQualifierNames(family), 
getQualifierDataTypes(family));
                 }
             }
-            return TableSchema.builder().fields(fieldNames, 
fieldTypes).build();
+            return DataTypes.ROW(
+                    Streams.zip(
+                                    Arrays.stream(fieldNames),
+                                    Arrays.stream(fieldTypes),
+                                    DataTypes::FIELD)
+                            .toArray(DataTypes.Field[]::new));
         } else {
             String[] fieldNames = new String[familyNames.length];
             DataType[] fieldTypes = new DataType[familyNames.length];
@@ -334,7 +293,12 @@ public class HBaseTableSchema implements Serializable {
                 fieldTypes[i] =
                         getRowDataType(getQualifierNames(family), 
getQualifierDataTypes(family));
             }
-            return TableSchema.builder().fields(fieldNames, 
fieldTypes).build();
+            return DataTypes.ROW(
+                    Streams.zip(
+                                    Arrays.stream(fieldNames),
+                                    Arrays.stream(fieldTypes),
+                                    DataTypes::FIELD)
+                            .toArray(DataTypes.Field[]::new));
         }
     }
 
@@ -354,10 +318,10 @@ public class HBaseTableSchema implements Serializable {
         return DataTypes.ROW(fields);
     }
 
-    /** Construct a {@link HBaseTableSchema} from a {@link TableSchema}. */
-    public static HBaseTableSchema fromTableSchema(TableSchema schema) {
+    /** Construct a {@link HBaseTableSchema} from a {@link DataType}. */
+    public static HBaseTableSchema fromDataType(DataType physicalRowType) {
         HBaseTableSchema hbaseSchema = new HBaseTableSchema();
-        RowType rowType = (RowType) 
schema.toPhysicalRowDataType().getLogicalType();
+        RowType rowType = (RowType) physicalRowType.getLogicalType();
         for (RowType.RowField field : rowType.getFields()) {
             LogicalType fieldType = field.getType();
             if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) {
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
 
b/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
index 5606f8a..81189bd 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.connector.hbase.util;
 
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
@@ -31,6 +31,12 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -95,23 +101,18 @@ public class HBaseSerdeTest {
     }
 
     private HBaseSerde createHBaseSerde() {
-        TableSchema schema =
-                TableSchema.builder()
-                        .field(ROW_KEY, DataTypes.INT())
-                        .field(FAMILY1, DataTypes.ROW(DataTypes.FIELD(F1COL1, 
DataTypes.INT())))
-                        .field(
-                                FAMILY2,
-                                DataTypes.ROW(
-                                        DataTypes.FIELD(F2COL1, 
DataTypes.STRING()),
-                                        DataTypes.FIELD(F2COL2, 
DataTypes.BIGINT())))
-                        .field(
+        DataType dataType =
+                ROW(
+                        FIELD(ROW_KEY, INT()),
+                        FIELD(FAMILY1, ROW(FIELD(F1COL1, INT()))),
+                        FIELD(FAMILY2, ROW(FIELD(F2COL1, STRING()), 
FIELD(F2COL2, BIGINT()))),
+                        FIELD(
                                 FAMILY3,
-                                DataTypes.ROW(
-                                        DataTypes.FIELD(F3COL1, 
DataTypes.DOUBLE()),
-                                        DataTypes.FIELD(F3COL2, 
DataTypes.BOOLEAN()),
-                                        DataTypes.FIELD(F3COL3, 
DataTypes.STRING())))
-                        .build();
-        HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(schema);
+                                ROW(
+                                        FIELD(F3COL1, DOUBLE()),
+                                        FIELD(F3COL2, DataTypes.BOOLEAN()),
+                                        FIELD(F3COL3, STRING()))));
+        HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
         return new HBaseSerde(hbaseSchema, "null");
     }
 

Reply via email to