This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 224233a3ee [#9080]Improvement(flink): Fix the Flink TypeUtils map
conversion to use value-type nullability (#9129)
224233a3ee is described below
commit 224233a3ee96035ed9b401ff6b029861cf0c6f0d
Author: Jackeyzhe <[email protected]>
AuthorDate: Thu Dec 4 16:57:39 2025 +0800
[#9080]Improvement(flink): Fix the Flink TypeUtils map conversion to use
value-type nullability (#9129)
### What changes were proposed in this pull request?
Fix the Flink TypeUtils map conversion to use value-type nullability
instead of map-level nullability.
### Why are the changes needed?
Fix: #9080
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
update some unit tests
---------
Co-authored-by: jackeyzhe <[email protected]>
---
.../gravitino/flink/connector/utils/TypeUtils.java | 12 ++++++-----
.../flink/connector/utils/TestTypeUtils.java | 23 ++++++++++++++++++----
2 files changed, 26 insertions(+), 9 deletions(-)
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
index 3ce11846d7..e988d082b1 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
@@ -102,13 +102,15 @@ public class TypeUtils {
return Types.TimestampType.withTimeZone(zonedPrecision);
case ARRAY:
ArrayType arrayType = (ArrayType) logicalType;
- Type elementType = toGravitinoType(arrayType.getElementType());
- return Types.ListType.of(elementType, arrayType.isNullable());
+ LogicalType elementLogicalType = arrayType.getElementType();
+ Type elementType = toGravitinoType(elementLogicalType);
+ return Types.ListType.of(elementType, elementLogicalType.isNullable());
case MAP:
MapType mapType = (MapType) logicalType;
- Type keyType = toGravitinoType(mapType.getKeyType());
- Type valueType = toGravitinoType(mapType.getValueType());
- return Types.MapType.of(keyType, valueType, mapType.isNullable());
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+ return Types.MapType.of(
+ toGravitinoType(keyType), toGravitinoType(valueType),
valueType.isNullable());
case ROW:
RowType rowType = (RowType) logicalType;
Types.StructType.Field[] fields =
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
index 438e5fbe37..a7e94533b2 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -86,18 +86,29 @@ public class TestTypeUtils {
new
YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR)));
Assertions.assertEquals(
Types.ListType.notNull(Types.IntegerType.get()),
- TypeUtils.toGravitinoType(new ArrayType(false, new IntType())));
+ TypeUtils.toGravitinoType(new ArrayType(false, new IntType(false))));
Assertions.assertEquals(
Types.ListType.nullable(Types.IntegerType.get()),
- TypeUtils.toGravitinoType(new ArrayType(true, new IntType())));
+ TypeUtils.toGravitinoType(new ArrayType(true, new IntType(true))));
+ Assertions.assertEquals(
+ Types.ListType.nullable(Types.IntegerType.get()),
+ TypeUtils.toGravitinoType(new ArrayType(false, new IntType(true))));
Assertions.assertEquals(
Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
true),
TypeUtils.toGravitinoType(
- new MapType(true, new VarCharType(Integer.MAX_VALUE), new
IntType())));
+ new MapType(true, new VarCharType(Integer.MAX_VALUE), new
IntType(true))));
+ Assertions.assertEquals(
+ Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
false),
+ TypeUtils.toGravitinoType(
+ new MapType(true, new VarCharType(Integer.MAX_VALUE), new
IntType(false))));
Assertions.assertEquals(
Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
false),
TypeUtils.toGravitinoType(
- new MapType(false, new VarCharType(Integer.MAX_VALUE), new
IntType())));
+ new MapType(false, new VarCharType(Integer.MAX_VALUE), new
IntType(false))));
+ Assertions.assertEquals(
+ Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
true),
+ TypeUtils.toGravitinoType(
+ new MapType(false, new VarCharType(Integer.MAX_VALUE), new
IntType(true))));
Assertions.assertEquals(
Types.StructType.of(
Types.StructType.Field.nullableField("a", Types.IntegerType.get()),
@@ -150,6 +161,10 @@ public class TestTypeUtils {
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT().nullable()),
TypeUtils.toFlinkType(
Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
true)));
+ Assertions.assertEquals(
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT().notNull()),
+ TypeUtils.toFlinkType(
+ Types.MapType.of(Types.StringType.get(), Types.IntegerType.get(),
false)));
Assertions.assertEquals(
DataTypes.ROW(
DataTypes.FIELD("a", DataTypes.INT().nullable()),