This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd209bd547a9db4d30817d61f3b80a9323451506 Author: Fabian Paul <[email protected]> AuthorDate: Tue Oct 12 13:24:39 2021 +0200 [FLINK-24397][connectors/hbase] Remove TableSchema usage from Hbase table connector --- .../hbase1/HBase1DynamicTableFactory.java | 13 ++-- .../hbase2/HBase2DynamicTableFactory.java | 13 ++-- .../hbase2/source/HBaseDynamicTableSource.java | 9 ++- .../HBaseRowDataAsyncLookupFunctionTest.java | 35 +++++----- .../source/AbstractHBaseDynamicTableSource.java | 18 ++--- .../hbase/table/HBaseConnectorOptionsUtil.java | 38 ++++++----- .../connector/hbase/util/HBaseTableSchema.java | 76 ++++++---------------- .../flink/connector/hbase/util/HBaseSerdeTest.java | 35 +++++----- 8 files changed, 99 insertions(+), 138 deletions(-) diff --git a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java index 6f825fb..3454064 100644 --- a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java +++ b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java @@ -25,7 +25,6 @@ import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink; import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; @@ -70,15 +69,15 @@ public class HBase1DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - TableSchema tableSchema = context.getCatalogTable().getSchema(); Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(tableSchema); + validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); Configuration hbaseClientConf = getHBaseConfiguration(options); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); + HBaseTableSchema hbaseSchema = + HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); return new HBaseDynamicTableSource( hbaseClientConf, @@ -95,16 +94,16 @@ public class HBase1DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - TableSchema tableSchema = context.getCatalogTable().getSchema(); Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(tableSchema); + validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); Configuration hbaseConf = getHBaseConfiguration(options); HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); + HBaseTableSchema hbaseSchema = + HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); return new HBaseDynamicTableSink( tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral); diff --git a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java index 7ba42af..c9246b6 100644 --- a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java +++ b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java @@ -26,7 +26,6 @@ import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink; import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSinkFactory; @@ -72,16 +71,16 @@ public class HBase2DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - TableSchema tableSchema = context.getCatalogTable().getSchema(); Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(tableSchema); + validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); Configuration hbaseConf = getHBaseConfiguration(options); HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); + HBaseTableSchema hbaseSchema = + HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); return new HBaseDynamicTableSource( hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions); @@ -94,16 +93,16 @@ public class HBase2DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - TableSchema tableSchema = context.getCatalogTable().getSchema(); Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(tableSchema); + validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); Configuration hbaseConf = getHBaseConfiguration(options); HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema); + HBaseTableSchema hbaseSchema = + HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); return new HBaseDynamicTableSink( tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral); diff --git a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java index d94b03d..806455c 100644 --- a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java +++ b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java @@ -29,6 +29,7 @@ import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.hadoop.conf.Configuration; @@ -56,11 +57,9 @@ public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource { hbaseSchema.getRowKeyName().isPresent(), "HBase schema must have a row key when used in lookup mode."); checkArgument( - hbaseSchema - .convertsToTableSchema() - .getTableColumn(context.getKeys()[0][0]) - .filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get())) - .isPresent(), + DataType.getFieldNames(hbaseSchema.convertToDataType()) + .get(context.getKeys()[0][0]) + .equals(hbaseSchema.getRowKeyName().get()), "Currently, HBase table only supports lookup by rowkey field."); if (lookupOptions.getLookupAsync()) { return AsyncTableFunctionProvider.of( diff --git a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java index 1e7f522..2f8cba6 100644 --- a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java +++ b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java @@ -22,8 +22,8 @@ import org.apache.flink.connector.hbase.options.HBaseLookupOptions; import org.apache.flink.connector.hbase.util.HBaseTableSchema; import org.apache.flink.connector.hbase2.util.HBaseTestBase; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; @@ -38,6 +38,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.DOUBLE; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -98,23 +104,18 @@ public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase { lookupOptions = HBaseLookupOptions.builder().setCacheMaxSize(4).setCacheExpireMs(10000).build(); } - TableSchema schema = - TableSchema.builder() - .field(ROW_KEY, DataTypes.INT()) - .field(FAMILY1, DataTypes.ROW(DataTypes.FIELD(F1COL1, DataTypes.INT()))) - .field( - FAMILY2, - DataTypes.ROW( - DataTypes.FIELD(F2COL1, DataTypes.STRING()), - DataTypes.FIELD(F2COL2, DataTypes.BIGINT()))) - .field( + DataType dataType = + ROW( + FIELD(ROW_KEY, INT()), + FIELD(FAMILY1, ROW(FIELD(F1COL1, INT()))), + FIELD(FAMILY2, ROW(FIELD(F2COL1, STRING()), FIELD(F2COL2, BIGINT()))), + FIELD( FAMILY3, - DataTypes.ROW( - DataTypes.FIELD(F3COL1, DataTypes.DOUBLE()), - DataTypes.FIELD(F3COL2, DataTypes.BOOLEAN()), - DataTypes.FIELD(F3COL3, DataTypes.STRING()))) - .build(); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema); + ROW( + FIELD(F3COL1, DOUBLE()), + FIELD(F3COL2, DataTypes.BOOLEAN()), + FIELD(F3COL3, STRING())))); + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType); return new HBaseRowDataAsyncLookupFunction( getConf(), TEST_TABLE_1, hbaseSchema, "null", lookupOptions); } diff --git a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java index 87a5bdb..e1806ef 100644 --- a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java +++ b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java @@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.connector.hbase.options.HBaseLookupOptions; import org.apache.flink.connector.hbase.util.HBaseTableSchema; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.LookupTableSource; @@ -31,7 +30,7 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.table.types.DataType; import org.apache.hadoop.conf.Configuration; @@ -77,11 +76,9 @@ public abstract class AbstractHBaseDynamicTableSource hbaseSchema.getRowKeyName().isPresent(), "HBase schema must have a row key when used in lookup mode."); checkArgument( - hbaseSchema - .convertsToTableSchema() - .getTableColumn(context.getKeys()[0][0]) - .filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get())) - .isPresent(), + DataType.getFieldNames(hbaseSchema.convertToDataType()) + .get(context.getKeys()[0][0]) + .equals(hbaseSchema.getRowKeyName().get()), "Currently, HBase table only supports lookup by rowkey field."); return TableFunctionProvider.of( @@ -97,10 +94,9 @@ public abstract class AbstractHBaseDynamicTableSource @Override public void applyProjection(int[][] projectedFields) { - TableSchema projectSchema = - TableSchemaUtils.projectSchema( - hbaseSchema.convertsToTableSchema(), projectedFields); - this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema); + this.hbaseSchema = + HBaseTableSchema.fromDataType( + DataType.projectFields(hbaseSchema.convertToDataType(), projectedFields)); } @Override diff --git a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java index 9ec1c06..6b9804f 100644 --- a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java +++ b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java @@ -24,7 +24,7 @@ import org.apache.flink.connector.hbase.options.HBaseLookupOptions; import org.apache.flink.connector.hbase.options.HBaseWriteOptions; import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; import org.apache.flink.connector.hbase.util.HBaseTableSchema; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.types.DataType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -57,29 +57,31 @@ public class HBaseConnectorOptionsUtil { * type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the primary key * constraint must be defined on the single row key field. */ - public static void validatePrimaryKey(TableSchema schema) { - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema); + public static void validatePrimaryKey(DataType dataType, int[] primaryKeyIndexes) { + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType); if (!hbaseSchema.getRowKeyName().isPresent()) { throw new IllegalArgumentException( "HBase table requires to define a row key field. " + "A row key field is defined as an atomic type, " + "column families and qualifiers are defined as ROW type."); } - schema.getPrimaryKey() - .ifPresent( - k -> { - if (k.getColumns().size() > 1) { - throw new IllegalArgumentException( - "HBase table doesn't support a primary Key on multiple columns. " - + "The primary key of HBase table must be defined on row key field."); - } - if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) { - throw new IllegalArgumentException( - "Primary key of HBase table must be defined on the row key field. " - + "A row key field is defined as an atomic type, " - + "column families and qualifiers are defined as ROW type."); - } - }); + if (primaryKeyIndexes.length == 0) { + return; + } + if (primaryKeyIndexes.length > 1) { + throw new IllegalArgumentException( + "HBase table doesn't support a primary Key on multiple columns. " + + "The primary key of HBase table must be defined on row key field."); + } + if (!hbaseSchema + .getRowKeyName() + .get() + .equals(DataType.getFieldNames(dataType).get(primaryKeyIndexes[0]))) { + throw new IllegalArgumentException( + "Primary key of HBase table must be defined on the row key field. " + + "A row key field is defined as an atomic type, " + + "column families and qualifiers are defined as ROW type."); + } } public static HBaseWriteOptions getHBaseWriteOptions(ReadableConfig tableOptions) { diff --git a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java index 81342f3..d43828a 100644 --- a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java +++ b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; @@ -30,6 +29,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.guava30.com.google.common.collect.Streams; + import java.io.Serializable; import java.nio.charset.Charset; import java.util.Arrays; @@ -200,20 +201,6 @@ public class HBaseTableSchema implements Serializable { return qualifierKeys; } - /** - * Returns the types of all registered column qualifiers of a specific column family. - * - * @param family The name of the column family for which the column qualifier types are - * returned. - * @return The types of all registered column qualifiers of a specific column family. - */ - public TypeInformation<?>[] getQualifierTypes(String family) { - DataType[] dataTypes = getQualifierDataTypes(family); - return Arrays.stream(dataTypes) - .map(TypeConversions::fromDataTypeToLegacyInfo) - .toArray(TypeInformation[]::new); - } - public DataType[] getQualifierDataTypes(String family) { Map<String, DataType> qualifierMap = familyMap.get(family); @@ -241,15 +228,6 @@ public class HBaseTableSchema implements Serializable { return familyMap.get(family); } - /** - * Returns the charset for value strings and HBase identifiers. - * - * @return The charset for value strings and HBase identifiers. - */ - public String getStringCharset() { - return this.charset; - } - /** Returns field index of row key in the table schema. Returns -1 if row key is not set. */ public int getRowKeyIndex() { return rowKeyInfo == null ? -1 : rowKeyInfo.rowKeyIndex; @@ -274,39 +252,15 @@ public class HBaseTableSchema implements Serializable { return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyName); } - /** Gets a new hbase schema with the selected fields. */ - public HBaseTableSchema getProjectedHBaseTableSchema(int[] projectedFields) { - if (projectedFields == null) { - return this; - } - HBaseTableSchema newSchema = new HBaseTableSchema(); - String[] fieldNames = convertsToTableSchema().getFieldNames(); - for (int projectedField : projectedFields) { - String name = fieldNames[projectedField]; - if (rowKeyInfo != null && name.equals(rowKeyInfo.rowKeyName)) { - newSchema.setRowKey(rowKeyInfo.rowKeyName, rowKeyInfo.rowKeyType); - } else { - Map<String, DataType> familyInfo = getFamilyInfo(name); - for (Map.Entry<String, DataType> entry : familyInfo.entrySet()) { - // create the newSchema - String qualifier = entry.getKey(); - newSchema.addColumn(name, qualifier, entry.getValue()); - } - } - } - newSchema.setCharset(charset); - return newSchema; - } - /** - * Converts this {@link HBaseTableSchema} to {@link TableSchema}, the fields are consisted of + * Converts this {@link HBaseTableSchema} to {@link DataType}, the fields are consisted of * families and rowkey, the order is in the definition order (i.e. calling {@link * #addColumn(String, String, Class)} and {@link #setRowKey(String, Class)}). The family field * is a composite type which is consisted of qualifiers. * - * @return the {@link TableSchema} derived from the {@link HBaseTableSchema}. + * @return the {@link DataType} derived from the {@link HBaseTableSchema}. */ - public TableSchema convertsToTableSchema() { + public DataType convertToDataType() { String[] familyNames = getFamilyNames(); if (rowKeyInfo != null) { String[] fieldNames = new String[familyNames.length + 1]; @@ -324,7 +278,12 @@ public class HBaseTableSchema implements Serializable { getQualifierNames(family), getQualifierDataTypes(family)); } } - return TableSchema.builder().fields(fieldNames, fieldTypes).build(); + return DataTypes.ROW( + Streams.zip( + Arrays.stream(fieldNames), + Arrays.stream(fieldTypes), + DataTypes::FIELD) + .toArray(DataTypes.Field[]::new)); } else { String[] fieldNames = new String[familyNames.length]; DataType[] fieldTypes = new DataType[familyNames.length]; @@ -334,7 +293,12 @@ public class HBaseTableSchema implements Serializable { fieldTypes[i] = getRowDataType(getQualifierNames(family), getQualifierDataTypes(family)); } - return TableSchema.builder().fields(fieldNames, fieldTypes).build(); + return DataTypes.ROW( + Streams.zip( + Arrays.stream(fieldNames), + Arrays.stream(fieldTypes), + DataTypes::FIELD) + .toArray(DataTypes.Field[]::new)); } } @@ -354,10 +318,10 @@ public class HBaseTableSchema implements Serializable { return DataTypes.ROW(fields); } - /** Construct a {@link HBaseTableSchema} from a {@link TableSchema}. */ - public static HBaseTableSchema fromTableSchema(TableSchema schema) { + /** Construct a {@link HBaseTableSchema} from a {@link DataType}. */ + public static HBaseTableSchema fromDataType(DataType physicalRowType) { HBaseTableSchema hbaseSchema = new HBaseTableSchema(); - RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType(); + RowType rowType = (RowType) physicalRowType.getLogicalType(); for (RowType.RowField field : rowType.getFields()) { LogicalType fieldType = field.getType(); if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) { diff --git a/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java b/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java index 5606f8a..81189bd 100644 --- a/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java +++ b/flink-connectors/flink-connector-hbase-base/src/test/java/org/apache/flink/connector/hbase/util/HBaseSerdeTest.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.hbase.util; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; @@ -31,6 +31,12 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.DOUBLE; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -95,23 +101,18 @@ public class HBaseSerdeTest { } private HBaseSerde createHBaseSerde() { - TableSchema schema = - TableSchema.builder() - .field(ROW_KEY, DataTypes.INT()) - .field(FAMILY1, DataTypes.ROW(DataTypes.FIELD(F1COL1, DataTypes.INT()))) - .field( - FAMILY2, - DataTypes.ROW( - DataTypes.FIELD(F2COL1, DataTypes.STRING()), - DataTypes.FIELD(F2COL2, DataTypes.BIGINT()))) - .field( + DataType dataType = + ROW( + FIELD(ROW_KEY, INT()), + FIELD(FAMILY1, ROW(FIELD(F1COL1, INT()))), + FIELD(FAMILY2, ROW(FIELD(F2COL1, STRING()), FIELD(F2COL2, BIGINT()))), + FIELD( FAMILY3, - DataTypes.ROW( - DataTypes.FIELD(F3COL1, DataTypes.DOUBLE()), - DataTypes.FIELD(F3COL2, DataTypes.BOOLEAN()), - DataTypes.FIELD(F3COL3, DataTypes.STRING()))) - .build(); - HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema); + ROW( + FIELD(F3COL1, DOUBLE()), + FIELD(F3COL2, DataTypes.BOOLEAN()), + FIELD(F3COL3, STRING())))); + HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType); return new HBaseSerde(hbaseSchema, "null"); }
