weiqingy commented on code in PR #28074:
URL: https://github.com/apache/flink/pull/28074#discussion_r3370989265
##########
flink-table/flink-table-type-utils/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java:
##########
@@ -344,7 +346,9 @@ public TypeSerializerSchemaCompatibility<RowData>
resolveSchemaCompatibility(
RowDataSerializerSnapshot oldRowDataSerializerSnapshot =
(RowDataSerializerSnapshot) oldSerializerSnapshot;
- if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) {
+ // Allow NOT NULL -> NULL widening; reject NULL -> NOT NULL
narrowing.
Review Comment:
The relaxation rests on the internal binary layout being independent of
declared nullability — `BinaryRowData` always reserves the null bitmask, so a
field written as NOT NULL reads back through a nullable serializer with no
migration, which is what makes returning `compatibleAsIs` (rather than
`compatibleAfterMigration`) correct. That invariant is load-bearing but isn't
captured anywhere in the change. Worth a one-line comment recording why
widening is binary-compatible-as-is, so the next maintainer doesn't have to
re-derive it? And to confirm — does the same hold for the mask-based python
serializers?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java:
##########
@@ -50,11 +57,59 @@ public final class LogicalTypeUtils {
private static final String ATOMIC_FIELD_NAME = "f0";
private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new
TimeAttributeRemover();
+ private static final NullabilityNormalizer NULLABILITY_NORMALIZER = new
NullabilityNormalizer();
public static LogicalType removeTimeAttributes(LogicalType logicalType) {
return logicalType.accept(TIME_ATTRIBUTE_REMOVER);
}
+ public static LogicalType normalizeNullability(LogicalType logicalType) {
+ return logicalType.accept(NULLABILITY_NORMALIZER);
+ }
+
+ /**
+ * Returns true when new types are structurally equal to old types
ignoring nullability, and no
+ * type narrows from nullable to non-nullable.
+ */
+ public static boolean areTypesCompatibleAfterNullabilityWidening(
+ LogicalType[] newTypes, LogicalType[] oldTypes) {
+ if (newTypes == oldTypes) {
+ return true;
+ }
+ if (newTypes == null || oldTypes == null || newTypes.length !=
oldTypes.length) {
+ return false;
+ }
+ for (int i = 0; i < newTypes.length; i++) {
+ if (!isTypeCompatibleAfterNullabilityWidening(newTypes[i],
oldTypes[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isTypeCompatibleAfterNullabilityWidening(
+ LogicalType newType, LogicalType oldType) {
+ if (newType == oldType) {
+ return true;
+ }
+ if (newType == null || oldType == null) {
+ return false;
+ }
+ return
normalizeNullability(newType).equals(normalizeNullability(oldType))
+ && !hasNullabilityNarrowing(newType, oldType);
+ }
+
+ public static boolean isSerializerCompatibleAfterNullabilityWidening(
Review Comment:
This helper and the private `resolveSchemaCompatibility` below operate
purely on `TypeSerializer` / `TypeSerializerSnapshot`, whereas the rest of
`LogicalTypeUtils` works on logical types — so this pulls the
serializer-snapshot framework into a logical-type utility in
flink-table-common. The type-level helpers (`normalizeNullability`,
`isTypeCompatibleAfterNullabilityWidening`,
`areTypesCompatibleAfterNullabilityWidening`) sit naturally here; would the
serializer helper fit better in a serializer-focused util nearer the
serializers (e.g. in flink-table-type-utils)? Curious whether there's a reason
to keep them together.
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerSnapshotTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.api.common.typeutils.TypeSerializerConditions.isCompatibleAsIs;
+import static
org.apache.flink.api.common.typeutils.TypeSerializerConditions.isIncompatible;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for serializer snapshots. */
+class SerializerSnapshotTest {
Review Comment:
The serializers under test (`RowDataSerializer`, `ArrayDataSerializer`,
`MapDataSerializer`) live in flink-table-type-utils, which already has
`RowDataSerializerTest` / `ArrayDataSerializerTest` / `MapDataSerializerTest` —
but this new test lands in flink-table-runtime. Is there a build reason it
can't sit alongside the code and the existing serializer tests in
flink-table-type-utils? Keeping it there would put the scaffolding next to what
it exercises (the near-identical python copy is harder to avoid, since that's a
separate module).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]