This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 1ed19176f [#3982]feat(flink-connector):Support more column data type
for hive table (#4265)
1ed19176f is described below
commit 1ed19176f50a89e9f9735a65b7e4f6e97fe65281
Author: Peidian li <[email protected]>
AuthorDate: Mon Jul 29 20:32:34 2024 +0800
[#3982]feat(flink-connector):Support more column data type for hive table
(#4265)
### What changes were proposed in this pull request?
- Support more columns for hive table
### Why are the changes needed?
Fix: #3982
### Does this PR introduce _any_ user-facing change?
- no
### How was this patch tested?
- add UTs and ITs:
flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
-
flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
---
flink-connector/build.gradle.kts | 1 +
.../gravitino/flink/connector/utils/TypeUtils.java | 146 +++++++++++++++
.../integration/test/hive/FlinkHiveCatalogIT.java | 207 +++++++++++++++++++++
.../flink/connector/utils/TestTypeUtils.java | 108 +++++++++++
4 files changed, 462 insertions(+)
diff --git a/flink-connector/build.gradle.kts b/flink-connector/build.gradle.kts
index 6a59ba528..f6776661b 100644
--- a/flink-connector/build.gradle.kts
+++ b/flink-connector/build.gradle.kts
@@ -150,6 +150,7 @@ tasks.test {
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)
+ dependsOn(":catalogs:catalog-hive:jar")
doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE",
"datastrato/gravitino-ci-hive:0.1.13")
diff --git
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
index 7c5635037..8766bea83 100644
---
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
+++
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
@@ -19,9 +19,18 @@
package org.apache.gravitino.flink.connector.utils;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
@@ -39,6 +48,71 @@ public class TypeUtils {
return Types.IntegerType.get();
case BIGINT:
return Types.LongType.get();
+ case CHAR:
+ CharType charType = (CharType) logicalType;
+ return Types.FixedCharType.of(charType.getLength());
+ case BOOLEAN:
+ return Types.BooleanType.get();
+ case BINARY:
+ BinaryType binaryType = (BinaryType) logicalType;
+ return Types.FixedType.of(binaryType.getLength());
+ case VARBINARY:
+ return Types.BinaryType.get();
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ return Types.DecimalType.of(decimalType.getPrecision(),
decimalType.getScale());
+ case TINYINT:
+ return Types.ByteType.get();
+ case SMALLINT:
+ return Types.ShortType.get();
+ case DATE:
+ return Types.DateType.get();
+ case TIME_WITHOUT_TIME_ZONE:
+ return Types.TimeType.get();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return Types.TimestampType.withoutTimeZone();
+ case INTERVAL_YEAR_MONTH:
+ return Types.IntervalYearType.get();
+ case INTERVAL_DAY_TIME:
+ return Types.IntervalDayType.get();
+ case FLOAT:
+ return Types.FloatType.get();
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case TIMESTAMP_WITH_TIME_ZONE:
+ return Types.TimestampType.withTimeZone();
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ Type elementType = toGravitinoType(arrayType.getElementType());
+ return Types.ListType.of(elementType, arrayType.isNullable());
+ case MAP:
+ MapType mapType = (MapType) logicalType;
+ Type keyType = toGravitinoType(mapType.getKeyType());
+ Type valueType = toGravitinoType(mapType.getValueType());
+ return Types.MapType.of(keyType, valueType, mapType.isNullable());
+ case ROW:
+ RowType rowType = (RowType) logicalType;
+ Types.StructType.Field[] fields =
+ rowType.getFields().stream()
+ .map(
+ field -> {
+ LogicalType fieldLogicalType = field.getType();
+ Type fieldType = toGravitinoType(fieldLogicalType);
+ return Types.StructType.Field.of(
+ field.getName(),
+ fieldType,
+ fieldLogicalType.isNullable(),
+ field.getDescription().orElse(null));
+ })
+ .toArray(Types.StructType.Field[]::new);
+ return Types.StructType.of(fields);
+ case NULL:
+ return Types.NullType.get();
+ case MULTISET:
+ case STRUCTURED_TYPE:
+ case UNRESOLVED:
+ case DISTINCT_TYPE:
+ case RAW:
+ case SYMBOL:
default:
throw new UnsupportedOperationException(
"Not support type: " + logicalType.asSummaryString());
@@ -55,8 +129,80 @@ public class TypeUtils {
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
+ case FLOAT:
+ return DataTypes.FLOAT();
+ case SHORT:
+ return DataTypes.SMALLINT();
+ case DECIMAL:
+ Types.DecimalType decimalType = (Types.DecimalType) gravitinoType;
+ return DataTypes.DECIMAL(decimalType.precision(), decimalType.scale());
+ case VARCHAR:
+ Types.VarCharType varCharType = (Types.VarCharType) gravitinoType;
+ return DataTypes.VARCHAR(varCharType.length());
+ case FIXED:
+ Types.FixedType fixedType = (Types.FixedType) gravitinoType;
+ return DataTypes.BINARY(fixedType.length());
+ case FIXEDCHAR:
+ Types.FixedCharType charType = (Types.FixedCharType) gravitinoType;
+ return DataTypes.CHAR(charType.length());
+ case BINARY:
+ return DataTypes.BYTES();
+ case BYTE:
+ return DataTypes.TINYINT();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case DATE:
+ return DataTypes.DATE();
+ case TIMESTAMP:
+ Types.TimestampType timestampType = (Types.TimestampType)
gravitinoType;
+ if (timestampType.hasTimeZone()) {
+ return DataTypes.TIMESTAMP_LTZ();
+ } else {
+ return DataTypes.TIMESTAMP();
+ }
+ case LIST:
+ Types.ListType listType = (Types.ListType) gravitinoType;
+ return DataTypes.ARRAY(
+ nullable(toFlinkType(listType.elementType()),
listType.elementNullable()));
+ case MAP:
+ Types.MapType mapType = (Types.MapType) gravitinoType;
+ return DataTypes.MAP(
+ toFlinkType(mapType.keyType()),
+ nullable(toFlinkType(mapType.valueType()),
mapType.valueNullable()));
+ case STRUCT:
+ Types.StructType structType = (Types.StructType) gravitinoType;
+ List<DataTypes.Field> fields =
+ Arrays.stream(structType.fields())
+ .map(
+ f -> {
+ if (f.comment() == null) {
+ return DataTypes.FIELD(
+ f.name(), nullable(toFlinkType(f.type()),
f.nullable()));
+ } else {
+ return DataTypes.FIELD(
+ f.name(), nullable(toFlinkType(f.type()),
f.nullable()), f.comment());
+ }
+ })
+ .collect(Collectors.toList());
+ return DataTypes.ROW(fields);
+ case NULL:
+ return DataTypes.NULL();
+ case TIME:
+ return DataTypes.TIME();
+ case INTERVAL_YEAR:
+ return DataTypes.INTERVAL(DataTypes.YEAR());
+ case INTERVAL_DAY:
+ return DataTypes.INTERVAL(DataTypes.DAY());
default:
throw new UnsupportedOperationException("Not support " +
gravitinoType.toString());
}
}
+
+ private static DataType nullable(DataType dataType, boolean nullable) {
+ if (nullable) {
+ return dataType.nullable();
+ } else {
+ return dataType.notNull();
+ }
+ }
}
diff --git
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index 09c264796..0da3820d9 100644
---
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -19,6 +19,8 @@
package org.apache.gravitino.flink.connector.integration.test.hive;
import static
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
+import static
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
+import static
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@@ -29,15 +31,18 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
import org.apache.flink.types.Row;
import org.apache.gravitino.NameIdentifier;
@@ -377,6 +382,208 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
true);
}
+ @Test
+ public void testCreateHiveTable() {
+ String databaseName = "test_create_hive_table_db";
+ String tableName = "test_create_hive_table";
+ String comment = "test comment";
+ String key = "test key";
+ String value = "test value";
+
+ // 1. The NOT NULL constraint for column is only supported since Hive 3.0,
+ // but the current Gravitino Hive catalog only supports Hive 2.x.
+ // 2. Hive doesn't support Time and Timestamp with timezone type.
+ // 3. Flink SQL only support to create Interval Month and Second(3).
+ doWithSchema(
+ metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(string_type STRING COMMENT 'string_type', "
+ + " double_type DOUBLE COMMENT 'double_type',"
+ + " int_type INT COMMENT 'int_type',"
+ + " varchar_type VARCHAR COMMENT 'varchar_type',"
+ + " char_type CHAR COMMENT 'char_type',"
+ + " boolean_type BOOLEAN COMMENT 'boolean_type',"
+ + " byte_type TINYINT COMMENT 'byte_type',"
+ + " binary_type VARBINARY(10) COMMENT 'binary_type',"
+ + " decimal_type DECIMAL(10, 2) COMMENT 'decimal_type',"
+ + " bigint_type BIGINT COMMENT 'bigint_type',"
+ + " float_type FLOAT COMMENT 'float_type',"
+ + " date_type DATE COMMENT 'date_type',"
+ + " timestamp_type TIMESTAMP COMMENT 'timestamp_type',"
+ + " smallint_type SMALLINT COMMENT 'smallint_type',"
+ + " array_type ARRAY<INT> COMMENT 'array_type',"
+ + " map_type MAP<INT, STRING> COMMENT 'map_type',"
+ + " struct_type ROW<k1 INT, k2 String>)"
+ + " COMMENT '%s' WITH ("
+ + "'%s' = '%s')",
+ tableName, comment, key, value);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+ Table table =
+
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
+ Assertions.assertNotNull(table);
+ Assertions.assertEquals(comment, table.comment());
+ Assertions.assertEquals(value, table.properties().get(key));
+ Column[] columns =
+ new Column[] {
+ Column.of("string_type", Types.StringType.get(),
"string_type", true, false, null),
+ Column.of("double_type", Types.DoubleType.get(),
"double_type"),
+ Column.of("int_type", Types.IntegerType.get(), "int_type"),
+ Column.of("varchar_type", Types.StringType.get(),
"varchar_type"),
+ Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
+ Column.of("boolean_type", Types.BooleanType.get(),
"boolean_type"),
+ Column.of("byte_type", Types.ByteType.get(), "byte_type"),
+ Column.of("binary_type", Types.BinaryType.get(),
"binary_type"),
+ Column.of("decimal_type", Types.DecimalType.of(10, 2),
"decimal_type"),
+ Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
+ Column.of("float_type", Types.FloatType.get(), "float_type"),
+ Column.of("date_type", Types.DateType.get(), "date_type"),
+ Column.of(
+ "timestamp_type", Types.TimestampType.withoutTimeZone(),
"timestamp_type"),
+ Column.of("smallint_type", Types.ShortType.get(),
"smallint_type"),
+ Column.of(
+ "array_type", Types.ListType.of(Types.IntegerType.get(),
true), "array_type"),
+ Column.of(
+ "map_type",
+ Types.MapType.of(Types.IntegerType.get(),
Types.StringType.get(), true),
+ "map_type"),
+ Column.of(
+ "struct_type",
+ Types.StructType.of(
+ Types.StructType.Field.nullableField("k1",
Types.IntegerType.get()),
+ Types.StructType.Field.nullableField("k2",
Types.StringType.get())),
+ null)
+ };
+ assertColumns(columns, table.columns());
+ Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning());
+ },
+ true);
+ }
+
+ @Test
+ public void testGetHiveTable() {
+ Column[] columns =
+ new Column[] {
+ Column.of("string_type", Types.StringType.get(), "string_type",
true, false, null),
+ Column.of("double_type", Types.DoubleType.get(), "double_type"),
+ Column.of("int_type", Types.IntegerType.get(), "int_type"),
+ Column.of("varchar_type", Types.StringType.get(), "varchar_type"),
+ Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
+ Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"),
+ Column.of("byte_type", Types.ByteType.get(), "byte_type"),
+ Column.of("binary_type", Types.BinaryType.get(), "binary_type"),
+ Column.of("decimal_type", Types.DecimalType.of(10, 2),
"decimal_type"),
+ Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
+ Column.of("float_type", Types.FloatType.get(), "float_type"),
+ Column.of("date_type", Types.DateType.get(), "date_type"),
+ Column.of("timestamp_type", Types.TimestampType.withoutTimeZone(),
"timestamp_type"),
+ Column.of("smallint_type", Types.ShortType.get(), "smallint_type"),
+ Column.of("fixed_char_type", Types.FixedCharType.of(10),
"fixed_char_type"),
+ Column.of("array_type", Types.ListType.of(Types.IntegerType.get(),
true), "array_type"),
+ Column.of(
+ "map_type",
+ Types.MapType.of(Types.IntegerType.get(),
Types.StringType.get(), true),
+ "map_type"),
+ Column.of(
+ "struct_type",
+ Types.StructType.of(
+ Types.StructType.Field.nullableField("k1",
Types.IntegerType.get()),
+ Types.StructType.Field.nullableField("k2",
Types.StringType.get())),
+ null)
+ };
+
+ String databaseName = "test_get_hive_table_db";
+ doWithSchema(
+ metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+ databaseName,
+ catalog -> {
+ String tableName = "test_desc_table";
+ String comment = "comment1";
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(databaseName, "test_desc_table"),
+ columns,
+ comment,
+ ImmutableMap.of("k1", "v1"));
+
+ Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
+ tableEnv.getCatalog(DEFAULT_HIVE_CATALOG);
+ Assertions.assertTrue(flinkCatalog.isPresent());
+ try {
+ CatalogBaseTable table =
+ flinkCatalog.get().getTable(new ObjectPath(databaseName,
tableName));
+ Assertions.assertNotNull(table);
+ Assertions.assertEquals(CatalogBaseTable.TableKind.TABLE,
table.getTableKind());
+ Assertions.assertEquals(comment, table.getComment());
+
+ org.apache.flink.table.catalog.Column[] expected =
+ new org.apache.flink.table.catalog.Column[] {
+
org.apache.flink.table.catalog.Column.physical("string_type",
DataTypes.STRING())
+ .withComment("string_type"),
+
org.apache.flink.table.catalog.Column.physical("double_type",
DataTypes.DOUBLE())
+ .withComment("double_type"),
+ org.apache.flink.table.catalog.Column.physical("int_type",
DataTypes.INT())
+ .withComment("int_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "varchar_type", DataTypes.VARCHAR(Integer.MAX_VALUE))
+ .withComment("varchar_type"),
+ org.apache.flink.table.catalog.Column.physical("char_type",
DataTypes.CHAR(1))
+ .withComment("char_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "boolean_type", DataTypes.BOOLEAN())
+ .withComment("boolean_type"),
+ org.apache.flink.table.catalog.Column.physical("byte_type",
DataTypes.TINYINT())
+ .withComment("byte_type"),
+
org.apache.flink.table.catalog.Column.physical("binary_type", DataTypes.BYTES())
+ .withComment("binary_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "decimal_type", DataTypes.DECIMAL(10, 2))
+ .withComment("decimal_type"),
+
org.apache.flink.table.catalog.Column.physical("bigint_type",
DataTypes.BIGINT())
+ .withComment("bigint_type"),
+ org.apache.flink.table.catalog.Column.physical("float_type",
DataTypes.FLOAT())
+ .withComment("float_type"),
+ org.apache.flink.table.catalog.Column.physical("date_type",
DataTypes.DATE())
+ .withComment("date_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "timestamp_type", DataTypes.TIMESTAMP())
+ .withComment("timestamp_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "smallint_type", DataTypes.SMALLINT())
+ .withComment("smallint_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "fixed_char_type", DataTypes.CHAR(10))
+ .withComment("fixed_char_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "array_type", DataTypes.ARRAY(DataTypes.INT()))
+ .withComment("array_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "map_type", DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()))
+ .withComment("map_type"),
+ org.apache.flink.table.catalog.Column.physical(
+ "struct_type",
+ DataTypes.ROW(
+ DataTypes.FIELD("k1", DataTypes.INT()),
+ DataTypes.FIELD("k2", DataTypes.STRING())))
+ };
+ org.apache.flink.table.catalog.Column[] actual =
+
toFlinkPhysicalColumn(table.getUnresolvedSchema().getColumns());
+ Assertions.assertArrayEquals(expected, actual);
+
+ CatalogTable catalogTable = (CatalogTable) table;
+ Assertions.assertFalse(catalogTable.isPartitioned());
+ } catch (TableNotExistException e) {
+ Assertions.fail(e);
+ }
+ },
+ true);
+ }
+
@Override
protected org.apache.gravitino.Catalog currentCatalog() {
return hiveCatalog;
diff --git
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
index e9b1f8343..8a9297cb5 100644
---
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
+++
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -21,11 +21,29 @@ package org.apache.gravitino.flink.connector.utils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -39,6 +57,57 @@ public class TestTypeUtils {
Assertions.assertEquals(Types.DoubleType.get(),
TypeUtils.toGravitinoType(new DoubleType()));
Assertions.assertEquals(Types.IntegerType.get(),
TypeUtils.toGravitinoType(new IntType()));
Assertions.assertEquals(Types.LongType.get(),
TypeUtils.toGravitinoType(new BigIntType()));
+ Assertions.assertEquals(
+ Types.FixedCharType.of(10), TypeUtils.toGravitinoType(new
CharType(10)));
+ Assertions.assertEquals(Types.BooleanType.get(),
TypeUtils.toGravitinoType(new BooleanType()));
+ Assertions.assertEquals(Types.FixedType.of(10),
TypeUtils.toGravitinoType(new BinaryType(10)));
+ Assertions.assertEquals(Types.ByteType.get(),
TypeUtils.toGravitinoType(new TinyIntType()));
+ Assertions.assertEquals(Types.DateType.get(),
TypeUtils.toGravitinoType(new DateType()));
+ Assertions.assertEquals(Types.BinaryType.get(),
TypeUtils.toGravitinoType(new VarBinaryType()));
+ Assertions.assertEquals(
+ Types.DecimalType.of(10, 3), TypeUtils.toGravitinoType(new
DecimalType(10, 3)));
+ Assertions.assertEquals(Types.ByteType.get(),
TypeUtils.toGravitinoType(new TinyIntType()));
+ Assertions.assertEquals(Types.ShortType.get(),
TypeUtils.toGravitinoType(new SmallIntType()));
+ Assertions.assertEquals(
+ Types.TimestampType.withoutTimeZone(), TypeUtils.toGravitinoType(new
TimestampType()));
+ Assertions.assertEquals(
+ Types.TimestampType.withTimeZone(), TypeUtils.toGravitinoType(new
ZonedTimestampType()));
+ Assertions.assertEquals(
+ Types.TimestampType.withTimeZone(),
+ TypeUtils.toGravitinoType(new LocalZonedTimestampType()));
+ Assertions.assertEquals(Types.TimeType.get(),
TypeUtils.toGravitinoType(new TimeType()));
+ Assertions.assertEquals(
+ Types.IntervalDayType.get(),
+ TypeUtils.toGravitinoType(
+ new
DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY)));
+ Assertions.assertEquals(
+ Types.IntervalYearType.get(),
+ TypeUtils.toGravitinoType(
+ new
YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR)));
+ Assertions.assertEquals(
+ Types.ListType.notNull(Types.IntegerType.get()),
+ TypeUtils.toGravitinoType(new ArrayType(false, new IntType())));
+ Assertions.assertEquals(
+ Types.ListType.nullable(Types.IntegerType.get()),
+ TypeUtils.toGravitinoType(new ArrayType(true, new IntType())));
+ Assertions.assertEquals(
+ Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
true),
+ TypeUtils.toGravitinoType(
+ new MapType(true, new VarCharType(Integer.MAX_VALUE), new
IntType())));
+ Assertions.assertEquals(
+ Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
false),
+ TypeUtils.toGravitinoType(
+ new MapType(false, new VarCharType(Integer.MAX_VALUE), new
IntType())));
+ Assertions.assertEquals(
+ Types.StructType.of(
+ Types.StructType.Field.nullableField("a", Types.IntegerType.get()),
+ Types.StructType.Field.notNullField("b", Types.IntegerType.get())),
+ TypeUtils.toGravitinoType(
+ RowType.of(
+ true,
+ new IntType[] {new IntType(true), new IntType(false)},
+ new String[] {"a", "b"})));
+ Assertions.assertEquals(Types.NullType.get(),
TypeUtils.toGravitinoType(new NullType()));
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
@@ -52,6 +121,45 @@ public class TestTypeUtils {
Assertions.assertEquals(DataTypes.STRING(),
TypeUtils.toFlinkType(Types.StringType.get()));
Assertions.assertEquals(DataTypes.INT(),
TypeUtils.toFlinkType(Types.IntegerType.get()));
Assertions.assertEquals(DataTypes.BIGINT(),
TypeUtils.toFlinkType(Types.LongType.get()));
+ Assertions.assertEquals(DataTypes.SMALLINT(),
TypeUtils.toFlinkType(Types.ShortType.get()));
+ Assertions.assertEquals(DataTypes.TINYINT(),
TypeUtils.toFlinkType(Types.ByteType.get()));
+ Assertions.assertEquals(DataTypes.BOOLEAN(),
TypeUtils.toFlinkType(Types.BooleanType.get()));
+ Assertions.assertEquals(DataTypes.BYTES(),
TypeUtils.toFlinkType(Types.BinaryType.get()));
+ Assertions.assertEquals(DataTypes.DATE(),
TypeUtils.toFlinkType(Types.DateType.get()));
+ Assertions.assertEquals(
+ DataTypes.DECIMAL(10, 3),
TypeUtils.toFlinkType(Types.DecimalType.of(10, 3)));
+ Assertions.assertEquals(DataTypes.CHAR(10),
TypeUtils.toFlinkType(Types.FixedCharType.of(10)));
+ Assertions.assertEquals(DataTypes.BYTES(),
TypeUtils.toFlinkType(Types.BinaryType.get()));
+ Assertions.assertEquals(DataTypes.BINARY(10),
TypeUtils.toFlinkType(Types.FixedType.of(10)));
+ Assertions.assertEquals(
+ DataTypes.TIMESTAMP(6),
TypeUtils.toFlinkType(Types.TimestampType.withoutTimeZone()));
+ Assertions.assertEquals(
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6),
+ TypeUtils.toFlinkType(Types.TimestampType.withTimeZone()));
+ Assertions.assertEquals(DataTypes.TIME(),
TypeUtils.toFlinkType(Types.TimeType.get()));
+ Assertions.assertEquals(
+ DataTypes.INTERVAL(DataTypes.DAY()),
TypeUtils.toFlinkType(Types.IntervalDayType.get()));
+ Assertions.assertEquals(
+ DataTypes.INTERVAL(DataTypes.YEAR()),
TypeUtils.toFlinkType(Types.IntervalYearType.get()));
+ Assertions.assertEquals(
+ DataTypes.ARRAY(DataTypes.INT().notNull()),
+ TypeUtils.toFlinkType(Types.ListType.of(Types.IntegerType.get(),
false)));
+ Assertions.assertEquals(
+ DataTypes.ARRAY(DataTypes.INT().nullable()),
+ TypeUtils.toFlinkType(Types.ListType.of(Types.IntegerType.get(),
true)));
+ Assertions.assertEquals(
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT().nullable()),
+ TypeUtils.toFlinkType(
+ Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
true)));
+ Assertions.assertEquals(
+ DataTypes.ROW(
+ DataTypes.FIELD("a", DataTypes.INT().nullable()),
+ DataTypes.FIELD("b", DataTypes.INT().notNull())),
+ TypeUtils.toFlinkType(
+ Types.StructType.of(
+ Types.StructType.Field.nullableField("a",
Types.IntegerType.get()),
+ Types.StructType.Field.notNullField("b",
Types.IntegerType.get()))));
+ Assertions.assertEquals(DataTypes.NULL(),
TypeUtils.toFlinkType(Types.NullType.get()));
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> TypeUtils.toFlinkType(Types.UnparsedType.of("unknown")));