weiqingy commented on code in PR #28074:
URL: https://github.com/apache/flink/pull/28074#discussion_r3370989265


##########
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java:
##########
@@ -344,7 +346,9 @@ public TypeSerializerSchemaCompatibility<RowData> 
resolveSchemaCompatibility(
 
             RowDataSerializerSnapshot oldRowDataSerializerSnapshot =
                     (RowDataSerializerSnapshot) oldSerializerSnapshot;
-            if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) {
+            // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL 
narrowing.

Review Comment:
   The relaxation rests on the internal binary layout being independent of 
declared nullability — `BinaryRowData` always reserves the null bitmask, so a 
field written as NOT NULL reads back through a nullable serializer with no 
migration, which is what makes returning `compatibleAsIs` (rather than 
`compatibleAfterMigration`) correct. That invariant is load-bearing but isn't 
captured anywhere in the change. Worth a one-line comment recording why 
widening is binary-compatible-as-is, so the next maintainer doesn't have to 
re-derive it? And to confirm — does the same hold for the mask-based python 
serializers?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java:
##########
@@ -50,11 +57,59 @@ public final class LogicalTypeUtils {
     private static final String ATOMIC_FIELD_NAME = "f0";
 
     private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new 
TimeAttributeRemover();
+    private static final NullabilityNormalizer NULLABILITY_NORMALIZER = new 
NullabilityNormalizer();
 
     public static LogicalType removeTimeAttributes(LogicalType logicalType) {
         return logicalType.accept(TIME_ATTRIBUTE_REMOVER);
     }
 
+    public static LogicalType normalizeNullability(LogicalType logicalType) {
+        return logicalType.accept(NULLABILITY_NORMALIZER);
+    }
+
+    /**
+     * Returns true when new types are structurally equal to old types 
ignoring nullability, and no
+     * type narrows from nullable to non-nullable.
+     */
+    public static boolean areTypesCompatibleAfterNullabilityWidening(
+            LogicalType[] newTypes, LogicalType[] oldTypes) {
+        if (newTypes == oldTypes) {
+            return true;
+        }
+        if (newTypes == null || oldTypes == null || newTypes.length != 
oldTypes.length) {
+            return false;
+        }
+        for (int i = 0; i < newTypes.length; i++) {
+            if (!isTypeCompatibleAfterNullabilityWidening(newTypes[i], 
oldTypes[i])) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean isTypeCompatibleAfterNullabilityWidening(
+            LogicalType newType, LogicalType oldType) {
+        if (newType == oldType) {
+            return true;
+        }
+        if (newType == null || oldType == null) {
+            return false;
+        }
+        return 
normalizeNullability(newType).equals(normalizeNullability(oldType))
+                && !hasNullabilityNarrowing(newType, oldType);
+    }
+
+    public static boolean isSerializerCompatibleAfterNullabilityWidening(

Review Comment:
   This helper and the private `resolveSchemaCompatibility` below operate 
purely on `TypeSerializer` / `TypeSerializerSnapshot`, whereas the rest of 
`LogicalTypeUtils` works on logical types — so this pulls the 
serializer-snapshot framework into a logical-type utility in 
flink-table-common. The type-level helpers (`normalizeNullability`, 
`isTypeCompatibleAfterNullabilityWidening`, 
`areTypesCompatibleAfterNullabilityWidening`) sit naturally here; would the 
serializer helper fit better in a serializer-focused util nearer the 
serializers (e.g. in flink-table-type-utils)? Curious whether there's a reason 
to keep them together.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs;
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for serializer snapshots. */
+class SerializerSnapshotTest {

Review Comment:
   The serializers under test (`RowDataSerializer`, `ArrayDataSerializer`, 
`MapDataSerializer`) live in flink-table-type-utils, which already has 
`RowDataSerializerTest` / `ArrayDataSerializerTest` / `MapDataSerializerTest` — 
but this new test lands in flink-table-runtime. Is there a build reason it 
can't sit alongside the code and the existing serializer tests in 
flink-table-type-utils? Keeping it there would put the scaffolding next to what 
it exercises (the near-identical python copy is harder to avoid, since that's a 
separate module).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to