This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new dd80a1a  [FLINK-23420][table-runtime] LinkedListSerializer now checks 
for null elements in the list
dd80a1a is described below

commit dd80a1a5b2cddb3ceab6a01a2ef1a4924e6d4178
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 20 11:23:52 2021 +0800

    [FLINK-23420][table-runtime] LinkedListSerializer now checks for null 
elements in the list
    
    This closes #16873
---
 .../api/java/typeutils/runtime/MaskUtils.java      |  22 +++-
 .../runtime/typeutils/LinkedListSerializer.java    |  89 +++++++++++++--
 .../typeutils/LinkedListSerializerTest.java        |   8 +-
 .../typeutils/LinkedListSerializerUpgradeTest.java | 123 +++++++++++++++++++++
 .../serializer-snapshot                            | Bin 0 -> 206 bytes
 .../linked-list-serializer-1.13/test-data          | Bin 0 -> 44 bytes
 6 files changed, 224 insertions(+), 18 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
index 3802d82..faccc10 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
@@ -27,10 +27,13 @@ import java.io.IOException;
 @Internal
 public final class MaskUtils {
 
-    @SuppressWarnings("UnusedAssignment")
     public static void writeMask(boolean[] mask, DataOutputView target) throws 
IOException {
-        final int len = mask.length;
+        writeMask(mask, mask.length, target);
+    }
 
+    @SuppressWarnings("UnusedAssignment")
+    public static void writeMask(boolean[] mask, int len, DataOutputView 
target)
+            throws IOException {
         int b = 0x00;
         int bytePos = 0;
 
@@ -57,10 +60,13 @@ public final class MaskUtils {
         }
     }
 
-    @SuppressWarnings("UnusedAssignment")
     public static void readIntoMask(DataInputView source, boolean[] mask) 
throws IOException {
-        final int len = mask.length;
+        readIntoMask(source, mask, mask.length);
+    }
 
+    @SuppressWarnings("UnusedAssignment")
+    public static void readIntoMask(DataInputView source, boolean[] mask, int 
len)
+            throws IOException {
         int b = 0x00;
         int bytePos = 0;
 
@@ -80,11 +86,15 @@ public final class MaskUtils {
         }
     }
 
-    @SuppressWarnings("UnusedAssignment")
     public static void readIntoAndCopyMask(
             DataInputView source, DataOutputView target, boolean[] mask) 
throws IOException {
-        final int len = mask.length;
+        readIntoAndCopyMask(source, target, mask, mask.length);
+    }
 
+    @SuppressWarnings("UnusedAssignment")
+    public static void readIntoAndCopyMask(
+            DataInputView source, DataOutputView target, boolean[] mask, int 
len)
+            throws IOException {
         int b = 0x00;
         int bytePos = 0;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
index df97203..ca0f3f9 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.MaskUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -39,18 +40,27 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>> {
 
+    // legacy, don't touch until we drop support for 1.9 savepoints
     private static final long serialVersionUID = 1L;
 
-    /** The serializer for the elements of the list. */
+    // The serializer for the elements of the list.
     private final TypeSerializer<T> elementSerializer;
 
+    private final boolean hasNullMask;
+    private transient boolean[] reuseMask;
+
     /**
      * Creates a list serializer that uses the given serializer to serialize 
the list's elements.
      *
      * @param elementSerializer The serializer for the elements of the list
      */
     public LinkedListSerializer(TypeSerializer<T> elementSerializer) {
+        this(elementSerializer, true);
+    }
+
+    public LinkedListSerializer(TypeSerializer<T> elementSerializer, boolean 
hasNullMask) {
         this.elementSerializer = checkNotNull(elementSerializer);
+        this.hasNullMask = hasNullMask;
     }
 
     // ------------------------------------------------------------------------
@@ -77,10 +87,7 @@ public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>>
 
     @Override
     public TypeSerializer<LinkedList<T>> duplicate() {
-        TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
-        return duplicateElement == elementSerializer
-                ? this
-                : new LinkedListSerializer<>(duplicateElement);
+        return new LinkedListSerializer<>(elementSerializer.duplicate(), 
hasNullMask);
     }
 
     @Override
@@ -92,7 +99,12 @@ public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>>
     public LinkedList<T> copy(LinkedList<T> from) {
         LinkedList<T> newList = new LinkedList<>();
         for (T element : from) {
-            newList.add(elementSerializer.copy(element));
+            // there is no compatibility problem here as it only copies from 
memory to memory
+            if (element == null) {
+                newList.add(null);
+            } else {
+                newList.add(elementSerializer.copy(element));
+            }
         }
         return newList;
     }
@@ -107,20 +119,49 @@ public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>>
         return -1; // var length
     }
 
+    private void ensureReuseMaskLength(int len) {
+        if (reuseMask == null || reuseMask.length < len) {
+            reuseMask = new boolean[len];
+        }
+    }
+
     @Override
     public void serialize(LinkedList<T> list, DataOutputView target) throws 
IOException {
         target.writeInt(list.size());
+        if (hasNullMask) {
+            ensureReuseMaskLength(list.size());
+            MaskUtils.writeMask(getNullMask(list), list.size(), target);
+        }
         for (T element : list) {
-            elementSerializer.serialize(element, target);
+            if (element != null) {
+                elementSerializer.serialize(element, target);
+            }
+        }
+    }
+
+    private boolean[] getNullMask(LinkedList<T> list) {
+        int idx = 0;
+        for (T item : list) {
+            reuseMask[idx] = item == null;
+            idx++;
         }
+        return reuseMask;
     }
 
     @Override
     public LinkedList<T> deserialize(DataInputView source) throws IOException {
         final int size = source.readInt();
         final LinkedList<T> list = new LinkedList<>();
+        if (hasNullMask) {
+            ensureReuseMaskLength(size);
+            MaskUtils.readIntoMask(source, reuseMask, size);
+        }
         for (int i = 0; i < size; i++) {
-            list.add(elementSerializer.deserialize(source));
+            if (hasNullMask && reuseMask[i]) {
+                list.add(null);
+            } else {
+                list.add(elementSerializer.deserialize(source));
+            }
         }
         return list;
     }
@@ -135,8 +176,14 @@ public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>>
         // copy number of elements
         final int num = source.readInt();
         target.writeInt(num);
+        if (hasNullMask) {
+            ensureReuseMaskLength(num);
+            MaskUtils.readIntoAndCopyMask(source, target, reuseMask, num);
+        }
         for (int i = 0; i < num; i++) {
-            elementSerializer.copy(source, target);
+            if (!(hasNullMask && reuseMask[i])) {
+                elementSerializer.copy(source, target);
+            }
         }
     }
 
@@ -169,7 +216,11 @@ public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>>
     public static class LinkedListSerializerSnapshot<T>
             extends CompositeTypeSerializerSnapshot<LinkedList<T>, 
LinkedListSerializer<T>> {
 
-        private static final int CURRENT_VERSION = 1;
+        private static final int CURRENT_VERSION = 2;
+
+        private static final int FIRST_VERSION_WITH_NULL_MASK = 2;
+
+        private int readVersion = CURRENT_VERSION;
 
         /** Constructor for read instantiation. */
         public LinkedListSerializerSnapshot() {
@@ -187,11 +238,27 @@ public final class LinkedListSerializer<T> extends 
TypeSerializer<LinkedList<T>>
         }
 
         @Override
+        protected void readOuterSnapshot(
+                int readOuterSnapshotVersion, DataInputView in, ClassLoader 
userCodeClassLoader) {
+            readVersion = readOuterSnapshotVersion;
+        }
+
+        @Override
+        protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
+                LinkedListSerializer<T> newSerializer) {
+            if (readVersion < FIRST_VERSION_WITH_NULL_MASK) {
+                return OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
+            }
+            return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
+        }
+
+        @Override
         protected LinkedListSerializer<T> 
createOuterSerializerWithNestedSerializers(
                 TypeSerializer<?>[] nestedSerializers) {
             @SuppressWarnings("unchecked")
             TypeSerializer<T> elementSerializer = (TypeSerializer<T>) 
nestedSerializers[0];
-            return new LinkedListSerializer<>(elementSerializer);
+            return new LinkedListSerializer<>(
+                    elementSerializer, readVersion >= 
FIRST_VERSION_WITH_NULL_MASK);
         }
 
         @Override
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
index eea1556..4e7ab1d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
@@ -67,6 +67,12 @@ public class LinkedListSerializerTest extends 
SerializerTestBase<LinkedList<Long
             list4.add(rnd.nextLong());
         }
 
-        return (LinkedList<Long>[]) new LinkedList[] {list1, list2, list3, 
list4};
+        // list with null values
+        final LinkedList<Long> list5 = new LinkedList<>();
+        for (int i = 0; i < rnd.nextInt(200); i++) {
+            list5.add(rnd.nextBoolean() ? null : rnd.nextLong());
+        }
+
+        return (LinkedList<Long>[]) new LinkedList[] {list1, list2, list3, 
list4, list5};
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java
new file mode 100644
index 0000000..ef34e4a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+
+import static org.hamcrest.Matchers.is;
+
+/** A {@link TypeSerializerUpgradeTestBase} for {@link LinkedListSerializer}. 
*/
+@RunWith(Parameterized.class)
+public class LinkedListSerializerUpgradeTest
+        extends TypeSerializerUpgradeTestBase<LinkedList<Long>, 
LinkedList<Long>> {
+
+    public LinkedListSerializerUpgradeTest(
+            TestSpecification<LinkedList<Long>, LinkedList<Long>> 
testSpecification) {
+        super(testSpecification);
+    }
+
+    @Parameterized.Parameters(name = "Test Specification = {0}")
+    public static Collection<TestSpecification<?, ?>> testSpecifications() 
throws Exception {
+        return Collections.singletonList(
+                new TestSpecification<>(
+                        "linked-list-serializer",
+                        MigrationVersion.v1_13,
+                        LinkedListSerializerSetup.class,
+                        LinkedListSerializerVerifier.class));
+    }
+
+    public static TypeSerializer<LinkedList<Long>> 
createLinkedListSerializer() {
+        return new LinkedListSerializer<>(new LongSerializer());
+    }
+
+    // 
----------------------------------------------------------------------------------------------
+    //  Specification for "linked-row-serializer"
+    // 
----------------------------------------------------------------------------------------------
+
+    /**
+     * This class is only public to work with {@link
+     * org.apache.flink.api.common.typeutils.ClassRelocator}.
+     */
+    public static final class LinkedListSerializerSetup
+            implements 
TypeSerializerUpgradeTestBase.PreUpgradeSetup<LinkedList<Long>> {
+
+        @Override
+        public TypeSerializer<LinkedList<Long>> createPriorSerializer() {
+            return createLinkedListSerializer();
+        }
+
+        @Override
+        public LinkedList<Long> createTestData() {
+            LinkedList<Long> list = new LinkedList<>();
+            list.add(42L);
+            list.add(-42L);
+            list.add(0L);
+            list.add(Long.MAX_VALUE);
+            list.add(Long.MIN_VALUE);
+            return list;
+        }
+    }
+
+    /**
+     * This class is only public to work with {@link
+     * org.apache.flink.api.common.typeutils.ClassRelocator}.
+     */
+    public static final class LinkedListSerializerVerifier
+            implements 
TypeSerializerUpgradeTestBase.UpgradeVerifier<LinkedList<Long>> {
+
+        @Override
+        public TypeSerializer<LinkedList<Long>> createUpgradedSerializer() {
+            return createLinkedListSerializer();
+        }
+
+        @Override
+        public Matcher<LinkedList<Long>> testDataMatcher() {
+            LinkedList<Long> list = new LinkedList<>();
+            list.add(42L);
+            list.add(-42L);
+            list.add(0L);
+            list.add(Long.MAX_VALUE);
+            list.add(Long.MIN_VALUE);
+            return is(list);
+        }
+
+        @Override
+        public Matcher<TypeSerializerSchemaCompatibility<LinkedList<Long>>>
+                schemaCompatibilityMatcher(MigrationVersion version) {
+            if (version.isNewerVersionThan(MigrationVersion.v1_13)) {
+                return TypeSerializerMatchers.isCompatibleAsIs();
+            } else {
+                return TypeSerializerMatchers.isCompatibleAfterMigration();
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/serializer-snapshot
 
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/serializer-snapshot
new file mode 100644
index 0000000..42e497b
Binary files /dev/null and 
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/serializer-snapshot
 differ
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/test-data
 
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/test-data
new file mode 100644
index 0000000..c9a4d87e
Binary files /dev/null and 
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/test-data
 differ

Reply via email to