This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 224233a3ee [#9080]Improvement(flink): Fix the Flink TypeUtils map 
conversion to use value-type nullability (#9129)
224233a3ee is described below

commit 224233a3ee96035ed9b401ff6b029861cf0c6f0d
Author: Jackeyzhe <[email protected]>
AuthorDate: Thu Dec 4 16:57:39 2025 +0800

    [#9080]Improvement(flink): Fix the Flink TypeUtils map conversion to use 
value-type nullability (#9129)
    
    ### What changes were proposed in this pull request?
    
    Fix the Flink TypeUtils map conversion to use value-type nullability
    instead of map-level nullability.
    
    ### Why are the changes needed?
    
    
    Fix: #9080
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    update some unit tests
    
    ---------
    
    Co-authored-by: jackeyzhe <[email protected]>
---
 .../gravitino/flink/connector/utils/TypeUtils.java | 12 ++++++-----
 .../flink/connector/utils/TestTypeUtils.java       | 23 ++++++++++++++++++----
 2 files changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
index 3ce11846d7..e988d082b1 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
@@ -102,13 +102,15 @@ public class TypeUtils {
         return Types.TimestampType.withTimeZone(zonedPrecision);
       case ARRAY:
         ArrayType arrayType = (ArrayType) logicalType;
-        Type elementType = toGravitinoType(arrayType.getElementType());
-        return Types.ListType.of(elementType, arrayType.isNullable());
+        LogicalType elementLogicalType = arrayType.getElementType();
+        Type elementType = toGravitinoType(elementLogicalType);
+        return Types.ListType.of(elementType, elementLogicalType.isNullable());
       case MAP:
         MapType mapType = (MapType) logicalType;
-        Type keyType = toGravitinoType(mapType.getKeyType());
-        Type valueType = toGravitinoType(mapType.getValueType());
-        return Types.MapType.of(keyType, valueType, mapType.isNullable());
+        LogicalType keyType = mapType.getKeyType();
+        LogicalType valueType = mapType.getValueType();
+        return Types.MapType.of(
+            toGravitinoType(keyType), toGravitinoType(valueType), 
valueType.isNullable());
       case ROW:
         RowType rowType = (RowType) logicalType;
         Types.StructType.Field[] fields =
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
index 438e5fbe37..a7e94533b2 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -86,18 +86,29 @@ public class TestTypeUtils {
             new 
YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR)));
     Assertions.assertEquals(
         Types.ListType.notNull(Types.IntegerType.get()),
-        TypeUtils.toGravitinoType(new ArrayType(false, new IntType())));
+        TypeUtils.toGravitinoType(new ArrayType(false, new IntType(false))));
     Assertions.assertEquals(
         Types.ListType.nullable(Types.IntegerType.get()),
-        TypeUtils.toGravitinoType(new ArrayType(true, new IntType())));
+        TypeUtils.toGravitinoType(new ArrayType(true, new IntType(true))));
+    Assertions.assertEquals(
+        Types.ListType.nullable(Types.IntegerType.get()),
+        TypeUtils.toGravitinoType(new ArrayType(false, new IntType(true))));
     Assertions.assertEquals(
         Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
         TypeUtils.toGravitinoType(
-            new MapType(true, new VarCharType(Integer.MAX_VALUE), new 
IntType())));
+            new MapType(true, new VarCharType(Integer.MAX_VALUE), new 
IntType(true))));
+    Assertions.assertEquals(
+        Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
false),
+        TypeUtils.toGravitinoType(
+            new MapType(true, new VarCharType(Integer.MAX_VALUE), new 
IntType(false))));
     Assertions.assertEquals(
         Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
false),
         TypeUtils.toGravitinoType(
-            new MapType(false, new VarCharType(Integer.MAX_VALUE), new 
IntType())));
+            new MapType(false, new VarCharType(Integer.MAX_VALUE), new 
IntType(false))));
+    Assertions.assertEquals(
+        Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true),
+        TypeUtils.toGravitinoType(
+            new MapType(false, new VarCharType(Integer.MAX_VALUE), new 
IntType(true))));
     Assertions.assertEquals(
         Types.StructType.of(
             Types.StructType.Field.nullableField("a", Types.IntegerType.get()),
@@ -150,6 +161,10 @@ public class TestTypeUtils {
         DataTypes.MAP(DataTypes.STRING(), DataTypes.INT().nullable()),
         TypeUtils.toFlinkType(
             Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
true)));
+    Assertions.assertEquals(
+        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT().notNull()),
+        TypeUtils.toFlinkType(
+            Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(), 
false)));
     Assertions.assertEquals(
         DataTypes.ROW(
             DataTypes.FIELD("a", DataTypes.INT().nullable()),

Reply via email to