This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ed19176f [#3982]feat(flink-connector):Support more column data type 
for hive table (#4265)
1ed19176f is described below

commit 1ed19176f50a89e9f9735a65b7e4f6e97fe65281
Author: Peidian li <[email protected]>
AuthorDate: Mon Jul 29 20:32:34 2024 +0800

    [#3982]feat(flink-connector):Support more column data type for hive table 
(#4265)
    
    ### What changes were proposed in this pull request?
    
    - Support more columns for hive table
    
    ### Why are the changes needed?
    
    Fix: #3982
    
    ### Does this PR introduce _any_ user-facing change?
    
    - no
    
    ### How was this patch tested?
    
    - add UTs and ITs:
    
flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
    -
    
flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
---
 flink-connector/build.gradle.kts                   |   1 +
 .../gravitino/flink/connector/utils/TypeUtils.java | 146 +++++++++++++++
 .../integration/test/hive/FlinkHiveCatalogIT.java  | 207 +++++++++++++++++++++
 .../flink/connector/utils/TestTypeUtils.java       | 108 +++++++++++
 4 files changed, 462 insertions(+)

diff --git a/flink-connector/build.gradle.kts b/flink-connector/build.gradle.kts
index 6a59ba528..f6776661b 100644
--- a/flink-connector/build.gradle.kts
+++ b/flink-connector/build.gradle.kts
@@ -150,6 +150,7 @@ tasks.test {
     exclude("**/integration/**")
   } else {
     dependsOn(tasks.jar)
+    dependsOn(":catalogs:catalog-hive:jar")
 
     doFirst {
       environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", 
"datastrato/gravitino-ci-hive:0.1.13")
diff --git 
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
index 7c5635037..8766bea83 100644
--- 
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
+++ 
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
@@ -19,9 +19,18 @@
 
 package org.apache.gravitino.flink.connector.utils;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
 
@@ -39,6 +48,71 @@ public class TypeUtils {
         return Types.IntegerType.get();
       case BIGINT:
         return Types.LongType.get();
+      case CHAR:
+        CharType charType = (CharType) logicalType;
+        return Types.FixedCharType.of(charType.getLength());
+      case BOOLEAN:
+        return Types.BooleanType.get();
+      case BINARY:
+        BinaryType binaryType = (BinaryType) logicalType;
+        return Types.FixedType.of(binaryType.getLength());
+      case VARBINARY:
+        return Types.BinaryType.get();
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) logicalType;
+        return Types.DecimalType.of(decimalType.getPrecision(), 
decimalType.getScale());
+      case TINYINT:
+        return Types.ByteType.get();
+      case SMALLINT:
+        return Types.ShortType.get();
+      case DATE:
+        return Types.DateType.get();
+      case TIME_WITHOUT_TIME_ZONE:
+        return Types.TimeType.get();
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        return Types.TimestampType.withoutTimeZone();
+      case INTERVAL_YEAR_MONTH:
+        return Types.IntervalYearType.get();
+      case INTERVAL_DAY_TIME:
+        return Types.IntervalDayType.get();
+      case FLOAT:
+        return Types.FloatType.get();
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+      case TIMESTAMP_WITH_TIME_ZONE:
+        return Types.TimestampType.withTimeZone();
+      case ARRAY:
+        ArrayType arrayType = (ArrayType) logicalType;
+        Type elementType = toGravitinoType(arrayType.getElementType());
+        return Types.ListType.of(elementType, arrayType.isNullable());
+      case MAP:
+        MapType mapType = (MapType) logicalType;
+        Type keyType = toGravitinoType(mapType.getKeyType());
+        Type valueType = toGravitinoType(mapType.getValueType());
+        return Types.MapType.of(keyType, valueType, mapType.isNullable());
+      case ROW:
+        RowType rowType = (RowType) logicalType;
+        Types.StructType.Field[] fields =
+            rowType.getFields().stream()
+                .map(
+                    field -> {
+                      LogicalType fieldLogicalType = field.getType();
+                      Type fieldType = toGravitinoType(fieldLogicalType);
+                      return Types.StructType.Field.of(
+                          field.getName(),
+                          fieldType,
+                          fieldLogicalType.isNullable(),
+                          field.getDescription().orElse(null));
+                    })
+                .toArray(Types.StructType.Field[]::new);
+        return Types.StructType.of(fields);
+      case NULL:
+        return Types.NullType.get();
+      case MULTISET:
+      case STRUCTURED_TYPE:
+      case UNRESOLVED:
+      case DISTINCT_TYPE:
+      case RAW:
+      case SYMBOL:
       default:
         throw new UnsupportedOperationException(
             "Not support type: " + logicalType.asSummaryString());
@@ -55,8 +129,80 @@ public class TypeUtils {
         return DataTypes.INT();
       case LONG:
         return DataTypes.BIGINT();
+      case FLOAT:
+        return DataTypes.FLOAT();
+      case SHORT:
+        return DataTypes.SMALLINT();
+      case DECIMAL:
+        Types.DecimalType decimalType = (Types.DecimalType) gravitinoType;
+        return DataTypes.DECIMAL(decimalType.precision(), decimalType.scale());
+      case VARCHAR:
+        Types.VarCharType varCharType = (Types.VarCharType) gravitinoType;
+        return DataTypes.VARCHAR(varCharType.length());
+      case FIXED:
+        Types.FixedType fixedType = (Types.FixedType) gravitinoType;
+        return DataTypes.BINARY(fixedType.length());
+      case FIXEDCHAR:
+        Types.FixedCharType charType = (Types.FixedCharType) gravitinoType;
+        return DataTypes.CHAR(charType.length());
+      case BINARY:
+        return DataTypes.BYTES();
+      case BYTE:
+        return DataTypes.TINYINT();
+      case BOOLEAN:
+        return DataTypes.BOOLEAN();
+      case DATE:
+        return DataTypes.DATE();
+      case TIMESTAMP:
+        Types.TimestampType timestampType = (Types.TimestampType) 
gravitinoType;
+        if (timestampType.hasTimeZone()) {
+          return DataTypes.TIMESTAMP_LTZ();
+        } else {
+          return DataTypes.TIMESTAMP();
+        }
+      case LIST:
+        Types.ListType listType = (Types.ListType) gravitinoType;
+        return DataTypes.ARRAY(
+            nullable(toFlinkType(listType.elementType()), 
listType.elementNullable()));
+      case MAP:
+        Types.MapType mapType = (Types.MapType) gravitinoType;
+        return DataTypes.MAP(
+            toFlinkType(mapType.keyType()),
+            nullable(toFlinkType(mapType.valueType()), 
mapType.valueNullable()));
+      case STRUCT:
+        Types.StructType structType = (Types.StructType) gravitinoType;
+        List<DataTypes.Field> fields =
+            Arrays.stream(structType.fields())
+                .map(
+                    f -> {
+                      if (f.comment() == null) {
+                        return DataTypes.FIELD(
+                            f.name(), nullable(toFlinkType(f.type()), 
f.nullable()));
+                      } else {
+                        return DataTypes.FIELD(
+                            f.name(), nullable(toFlinkType(f.type()), 
f.nullable()), f.comment());
+                      }
+                    })
+                .collect(Collectors.toList());
+        return DataTypes.ROW(fields);
+      case NULL:
+        return DataTypes.NULL();
+      case TIME:
+        return DataTypes.TIME();
+      case INTERVAL_YEAR:
+        return DataTypes.INTERVAL(DataTypes.YEAR());
+      case INTERVAL_DAY:
+        return DataTypes.INTERVAL(DataTypes.DAY());
       default:
         throw new UnsupportedOperationException("Not support " + 
gravitinoType.toString());
     }
   }
+
+  private static DataType nullable(DataType dataType, boolean nullable) {
+    if (nullable) {
+      return dataType.nullable();
+    } else {
+      return dataType.notNull();
+    }
+  }
 }
diff --git 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index 09c264796..0da3820d9 100644
--- 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++ 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -19,6 +19,8 @@
 package org.apache.gravitino.flink.connector.integration.test.hive;
 
 import static 
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
+import static 
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
+import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
@@ -29,15 +31,18 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.catalog.DefaultCatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
 import org.apache.flink.types.Row;
 import org.apache.gravitino.NameIdentifier;
@@ -377,6 +382,208 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
         true);
   }
 
+  @Test
+  public void testCreateHiveTable() {
+    String databaseName = "test_create_hive_table_db";
+    String tableName = "test_create_hive_table";
+    String comment = "test comment";
+    String key = "test key";
+    String value = "test value";
+
+    // 1. The NOT NULL constraint for column is only supported since Hive 3.0,
+    // but the current Gravitino Hive catalog only supports Hive 2.x.
+    // 2. Hive doesn't support Time and Timestamp with timezone type.
+    // 3. Flink SQL only support to create Interval Month and Second(3).
+    doWithSchema(
+        metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(string_type STRING COMMENT 'string_type', "
+                      + " double_type DOUBLE COMMENT 'double_type',"
+                      + " int_type INT COMMENT 'int_type',"
+                      + " varchar_type VARCHAR COMMENT 'varchar_type',"
+                      + " char_type CHAR COMMENT 'char_type',"
+                      + " boolean_type BOOLEAN COMMENT 'boolean_type',"
+                      + " byte_type TINYINT COMMENT 'byte_type',"
+                      + " binary_type VARBINARY(10) COMMENT 'binary_type',"
+                      + " decimal_type DECIMAL(10, 2) COMMENT 'decimal_type',"
+                      + " bigint_type BIGINT COMMENT 'bigint_type',"
+                      + " float_type FLOAT COMMENT 'float_type',"
+                      + " date_type DATE COMMENT 'date_type',"
+                      + " timestamp_type TIMESTAMP COMMENT 'timestamp_type',"
+                      + " smallint_type SMALLINT COMMENT 'smallint_type',"
+                      + " array_type ARRAY<INT> COMMENT 'array_type',"
+                      + " map_type MAP<INT, STRING> COMMENT 'map_type',"
+                      + " struct_type ROW<k1 INT, k2 String>)"
+                      + " COMMENT '%s' WITH ("
+                      + "'%s' = '%s')",
+                  tableName, comment, key, value);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+          Table table =
+              
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
+          Assertions.assertNotNull(table);
+          Assertions.assertEquals(comment, table.comment());
+          Assertions.assertEquals(value, table.properties().get(key));
+          Column[] columns =
+              new Column[] {
+                Column.of("string_type", Types.StringType.get(), 
"string_type", true, false, null),
+                Column.of("double_type", Types.DoubleType.get(), 
"double_type"),
+                Column.of("int_type", Types.IntegerType.get(), "int_type"),
+                Column.of("varchar_type", Types.StringType.get(), 
"varchar_type"),
+                Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
+                Column.of("boolean_type", Types.BooleanType.get(), 
"boolean_type"),
+                Column.of("byte_type", Types.ByteType.get(), "byte_type"),
+                Column.of("binary_type", Types.BinaryType.get(), 
"binary_type"),
+                Column.of("decimal_type", Types.DecimalType.of(10, 2), 
"decimal_type"),
+                Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
+                Column.of("float_type", Types.FloatType.get(), "float_type"),
+                Column.of("date_type", Types.DateType.get(), "date_type"),
+                Column.of(
+                    "timestamp_type", Types.TimestampType.withoutTimeZone(), 
"timestamp_type"),
+                Column.of("smallint_type", Types.ShortType.get(), 
"smallint_type"),
+                Column.of(
+                    "array_type", Types.ListType.of(Types.IntegerType.get(), 
true), "array_type"),
+                Column.of(
+                    "map_type",
+                    Types.MapType.of(Types.IntegerType.get(), 
Types.StringType.get(), true),
+                    "map_type"),
+                Column.of(
+                    "struct_type",
+                    Types.StructType.of(
+                        Types.StructType.Field.nullableField("k1", 
Types.IntegerType.get()),
+                        Types.StructType.Field.nullableField("k2", 
Types.StringType.get())),
+                    null)
+              };
+          assertColumns(columns, table.columns());
+          Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning());
+        },
+        true);
+  }
+
+  @Test
+  public void testGetHiveTable() {
+    Column[] columns =
+        new Column[] {
+          Column.of("string_type", Types.StringType.get(), "string_type", 
true, false, null),
+          Column.of("double_type", Types.DoubleType.get(), "double_type"),
+          Column.of("int_type", Types.IntegerType.get(), "int_type"),
+          Column.of("varchar_type", Types.StringType.get(), "varchar_type"),
+          Column.of("char_type", Types.FixedCharType.of(1), "char_type"),
+          Column.of("boolean_type", Types.BooleanType.get(), "boolean_type"),
+          Column.of("byte_type", Types.ByteType.get(), "byte_type"),
+          Column.of("binary_type", Types.BinaryType.get(), "binary_type"),
+          Column.of("decimal_type", Types.DecimalType.of(10, 2), 
"decimal_type"),
+          Column.of("bigint_type", Types.LongType.get(), "bigint_type"),
+          Column.of("float_type", Types.FloatType.get(), "float_type"),
+          Column.of("date_type", Types.DateType.get(), "date_type"),
+          Column.of("timestamp_type", Types.TimestampType.withoutTimeZone(), 
"timestamp_type"),
+          Column.of("smallint_type", Types.ShortType.get(), "smallint_type"),
+          Column.of("fixed_char_type", Types.FixedCharType.of(10), 
"fixed_char_type"),
+          Column.of("array_type", Types.ListType.of(Types.IntegerType.get(), 
true), "array_type"),
+          Column.of(
+              "map_type",
+              Types.MapType.of(Types.IntegerType.get(), 
Types.StringType.get(), true),
+              "map_type"),
+          Column.of(
+              "struct_type",
+              Types.StructType.of(
+                  Types.StructType.Field.nullableField("k1", 
Types.IntegerType.get()),
+                  Types.StructType.Field.nullableField("k2", 
Types.StringType.get())),
+              null)
+        };
+
+    String databaseName = "test_get_hive_table_db";
+    doWithSchema(
+        metalake.loadCatalog(DEFAULT_HIVE_CATALOG),
+        databaseName,
+        catalog -> {
+          String tableName = "test_desc_table";
+          String comment = "comment1";
+          catalog
+              .asTableCatalog()
+              .createTable(
+                  NameIdentifier.of(databaseName, "test_desc_table"),
+                  columns,
+                  comment,
+                  ImmutableMap.of("k1", "v1"));
+
+          Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
+              tableEnv.getCatalog(DEFAULT_HIVE_CATALOG);
+          Assertions.assertTrue(flinkCatalog.isPresent());
+          try {
+            CatalogBaseTable table =
+                flinkCatalog.get().getTable(new ObjectPath(databaseName, 
tableName));
+            Assertions.assertNotNull(table);
+            Assertions.assertEquals(CatalogBaseTable.TableKind.TABLE, 
table.getTableKind());
+            Assertions.assertEquals(comment, table.getComment());
+
+            org.apache.flink.table.catalog.Column[] expected =
+                new org.apache.flink.table.catalog.Column[] {
+                  
org.apache.flink.table.catalog.Column.physical("string_type", 
DataTypes.STRING())
+                      .withComment("string_type"),
+                  
org.apache.flink.table.catalog.Column.physical("double_type", 
DataTypes.DOUBLE())
+                      .withComment("double_type"),
+                  org.apache.flink.table.catalog.Column.physical("int_type", 
DataTypes.INT())
+                      .withComment("int_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "varchar_type", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                      .withComment("varchar_type"),
+                  org.apache.flink.table.catalog.Column.physical("char_type", 
DataTypes.CHAR(1))
+                      .withComment("char_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "boolean_type", DataTypes.BOOLEAN())
+                      .withComment("boolean_type"),
+                  org.apache.flink.table.catalog.Column.physical("byte_type", 
DataTypes.TINYINT())
+                      .withComment("byte_type"),
+                  
org.apache.flink.table.catalog.Column.physical("binary_type", DataTypes.BYTES())
+                      .withComment("binary_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "decimal_type", DataTypes.DECIMAL(10, 2))
+                      .withComment("decimal_type"),
+                  
org.apache.flink.table.catalog.Column.physical("bigint_type", 
DataTypes.BIGINT())
+                      .withComment("bigint_type"),
+                  org.apache.flink.table.catalog.Column.physical("float_type", 
DataTypes.FLOAT())
+                      .withComment("float_type"),
+                  org.apache.flink.table.catalog.Column.physical("date_type", 
DataTypes.DATE())
+                      .withComment("date_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "timestamp_type", DataTypes.TIMESTAMP())
+                      .withComment("timestamp_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "smallint_type", DataTypes.SMALLINT())
+                      .withComment("smallint_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "fixed_char_type", DataTypes.CHAR(10))
+                      .withComment("fixed_char_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "array_type", DataTypes.ARRAY(DataTypes.INT()))
+                      .withComment("array_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                          "map_type", DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()))
+                      .withComment("map_type"),
+                  org.apache.flink.table.catalog.Column.physical(
+                      "struct_type",
+                      DataTypes.ROW(
+                          DataTypes.FIELD("k1", DataTypes.INT()),
+                          DataTypes.FIELD("k2", DataTypes.STRING())))
+                };
+            org.apache.flink.table.catalog.Column[] actual =
+                
toFlinkPhysicalColumn(table.getUnresolvedSchema().getColumns());
+            Assertions.assertArrayEquals(expected, actual);
+
+            CatalogTable catalogTable = (CatalogTable) table;
+            Assertions.assertFalse(catalogTable.isPartitioned());
+          } catch (TableNotExistException e) {
+            Assertions.fail(e);
+          }
+        },
+        true);
+  }
+
   @Override
   protected org.apache.gravitino.Catalog currentCatalog() {
     return hiveCatalog;
diff --git 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
index e9b1f8343..8a9297cb5 100644
--- 
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
+++ 
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -21,11 +21,29 @@ package org.apache.gravitino.flink.connector.utils;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
 import org.apache.gravitino.rel.types.Types;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -39,6 +57,57 @@ public class TestTypeUtils {
     Assertions.assertEquals(Types.DoubleType.get(), 
TypeUtils.toGravitinoType(new DoubleType()));
     Assertions.assertEquals(Types.IntegerType.get(), 
TypeUtils.toGravitinoType(new IntType()));
     Assertions.assertEquals(Types.LongType.get(), 
TypeUtils.toGravitinoType(new BigIntType()));
+    Assertions.assertEquals(
+        Types.FixedCharType.of(10), TypeUtils.toGravitinoType(new 
CharType(10)));
+    Assertions.assertEquals(Types.BooleanType.get(), 
TypeUtils.toGravitinoType(new BooleanType()));
+    Assertions.assertEquals(Types.FixedType.of(10), 
TypeUtils.toGravitinoType(new BinaryType(10)));
+    Assertions.assertEquals(Types.ByteType.get(), 
TypeUtils.toGravitinoType(new TinyIntType()));
+    Assertions.assertEquals(Types.DateType.get(), 
TypeUtils.toGravitinoType(new DateType()));
+    Assertions.assertEquals(Types.BinaryType.get(), 
TypeUtils.toGravitinoType(new VarBinaryType()));
+    Assertions.assertEquals(
+        Types.DecimalType.of(10, 3), TypeUtils.toGravitinoType(new 
DecimalType(10, 3)));
+    Assertions.assertEquals(Types.ByteType.get(), 
TypeUtils.toGravitinoType(new TinyIntType()));
+    Assertions.assertEquals(Types.ShortType.get(), 
TypeUtils.toGravitinoType(new SmallIntType()));
+    Assertions.assertEquals(
+        Types.TimestampType.withoutTimeZone(), TypeUtils.toGravitinoType(new 
TimestampType()));
+    Assertions.assertEquals(
+        Types.TimestampType.withTimeZone(), TypeUtils.toGravitinoType(new 
ZonedTimestampType()));
+    Assertions.assertEquals(
+        Types.TimestampType.withTimeZone(),
+        TypeUtils.toGravitinoType(new LocalZonedTimestampType()));
+    Assertions.assertEquals(Types.TimeType.get(), 
TypeUtils.toGravitinoType(new TimeType()));
+    Assertions.assertEquals(
+        Types.IntervalDayType.get(),
+        TypeUtils.toGravitinoType(
+            new 
DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY)));
+    Assertions.assertEquals(
+        Types.IntervalYearType.get(),
+        TypeUtils.toGravitinoType(
+            new 
YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR)));
+    Assertions.assertEquals(
+        Types.ListType.notNull(Types.IntegerType.get()),
+        TypeUtils.toGravitinoType(new ArrayType(false, new IntType())));
+    Assertions.assertEquals(
+        Types.ListType.nullable(Types.IntegerType.get()),
+        TypeUtils.toGravitinoType(new ArrayType(true, new IntType())));
+    Assertions.assertEquals(
+        Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
+        TypeUtils.toGravitinoType(
+            new MapType(true, new VarCharType(Integer.MAX_VALUE), new 
IntType())));
+    Assertions.assertEquals(
+        Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
false),
+        TypeUtils.toGravitinoType(
+            new MapType(false, new VarCharType(Integer.MAX_VALUE), new 
IntType())));
+    Assertions.assertEquals(
+        Types.StructType.of(
+            Types.StructType.Field.nullableField("a", Types.IntegerType.get()),
+            Types.StructType.Field.notNullField("b", Types.IntegerType.get())),
+        TypeUtils.toGravitinoType(
+            RowType.of(
+                true,
+                new IntType[] {new IntType(true), new IntType(false)},
+                new String[] {"a", "b"})));
+    Assertions.assertEquals(Types.NullType.get(), 
TypeUtils.toGravitinoType(new NullType()));
     Assertions.assertThrows(
         UnsupportedOperationException.class,
         () ->
@@ -52,6 +121,45 @@ public class TestTypeUtils {
     Assertions.assertEquals(DataTypes.STRING(), 
TypeUtils.toFlinkType(Types.StringType.get()));
     Assertions.assertEquals(DataTypes.INT(), 
TypeUtils.toFlinkType(Types.IntegerType.get()));
     Assertions.assertEquals(DataTypes.BIGINT(), 
TypeUtils.toFlinkType(Types.LongType.get()));
+    Assertions.assertEquals(DataTypes.SMALLINT(), 
TypeUtils.toFlinkType(Types.ShortType.get()));
+    Assertions.assertEquals(DataTypes.TINYINT(), 
TypeUtils.toFlinkType(Types.ByteType.get()));
+    Assertions.assertEquals(DataTypes.BOOLEAN(), 
TypeUtils.toFlinkType(Types.BooleanType.get()));
+    Assertions.assertEquals(DataTypes.BYTES(), 
TypeUtils.toFlinkType(Types.BinaryType.get()));
+    Assertions.assertEquals(DataTypes.DATE(), 
TypeUtils.toFlinkType(Types.DateType.get()));
+    Assertions.assertEquals(
+        DataTypes.DECIMAL(10, 3), 
TypeUtils.toFlinkType(Types.DecimalType.of(10, 3)));
+    Assertions.assertEquals(DataTypes.CHAR(10), 
TypeUtils.toFlinkType(Types.FixedCharType.of(10)));
+    Assertions.assertEquals(DataTypes.BYTES(), 
TypeUtils.toFlinkType(Types.BinaryType.get()));
+    Assertions.assertEquals(DataTypes.BINARY(10), 
TypeUtils.toFlinkType(Types.FixedType.of(10)));
+    Assertions.assertEquals(
+        DataTypes.TIMESTAMP(6), 
TypeUtils.toFlinkType(Types.TimestampType.withoutTimeZone()));
+    Assertions.assertEquals(
+        DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6),
+        TypeUtils.toFlinkType(Types.TimestampType.withTimeZone()));
+    Assertions.assertEquals(DataTypes.TIME(), 
TypeUtils.toFlinkType(Types.TimeType.get()));
+    Assertions.assertEquals(
+        DataTypes.INTERVAL(DataTypes.DAY()), 
TypeUtils.toFlinkType(Types.IntervalDayType.get()));
+    Assertions.assertEquals(
+        DataTypes.INTERVAL(DataTypes.YEAR()), 
TypeUtils.toFlinkType(Types.IntervalYearType.get()));
+    Assertions.assertEquals(
+        DataTypes.ARRAY(DataTypes.INT().notNull()),
+        TypeUtils.toFlinkType(Types.ListType.of(Types.IntegerType.get(), 
false)));
+    Assertions.assertEquals(
+        DataTypes.ARRAY(DataTypes.INT().nullable()),
+        TypeUtils.toFlinkType(Types.ListType.of(Types.IntegerType.get(), 
true)));
+    Assertions.assertEquals(
+        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT().nullable()),
+        TypeUtils.toFlinkType(
+            Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true)));
+    Assertions.assertEquals(
+        DataTypes.ROW(
+            DataTypes.FIELD("a", DataTypes.INT().nullable()),
+            DataTypes.FIELD("b", DataTypes.INT().notNull())),
+        TypeUtils.toFlinkType(
+            Types.StructType.of(
+                Types.StructType.Field.nullableField("a", 
Types.IntegerType.get()),
+                Types.StructType.Field.notNullField("b", 
Types.IntegerType.get()))));
+    Assertions.assertEquals(DataTypes.NULL(), 
TypeUtils.toFlinkType(Types.NullType.get()));
     Assertions.assertThrows(
         UnsupportedOperationException.class,
         () -> TypeUtils.toFlinkType(Types.UnparsedType.of("unknown")));

Reply via email to