This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new dd80a1a [FLINK-23420][table-runtime] LinkedListSerializer now checks
for null elements in the list
dd80a1a is described below
commit dd80a1a5b2cddb3ceab6a01a2ef1a4924e6d4178
Author: tsreaper <[email protected]>
AuthorDate: Fri Aug 20 11:23:52 2021 +0800
[FLINK-23420][table-runtime] LinkedListSerializer now checks for null
elements in the list
This closes #16873
---
.../api/java/typeutils/runtime/MaskUtils.java | 22 +++-
.../runtime/typeutils/LinkedListSerializer.java | 89 +++++++++++++--
.../typeutils/LinkedListSerializerTest.java | 8 +-
.../typeutils/LinkedListSerializerUpgradeTest.java | 123 +++++++++++++++++++++
.../serializer-snapshot | Bin 0 -> 206 bytes
.../linked-list-serializer-1.13/test-data | Bin 0 -> 44 bytes
6 files changed, 224 insertions(+), 18 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
index 3802d82..faccc10 100644
---
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
@@ -27,10 +27,13 @@ import java.io.IOException;
@Internal
public final class MaskUtils {
- @SuppressWarnings("UnusedAssignment")
public static void writeMask(boolean[] mask, DataOutputView target) throws
IOException {
- final int len = mask.length;
+ writeMask(mask, mask.length, target);
+ }
+ @SuppressWarnings("UnusedAssignment")
+ public static void writeMask(boolean[] mask, int len, DataOutputView
target)
+ throws IOException {
int b = 0x00;
int bytePos = 0;
@@ -57,10 +60,13 @@ public final class MaskUtils {
}
}
- @SuppressWarnings("UnusedAssignment")
public static void readIntoMask(DataInputView source, boolean[] mask)
throws IOException {
- final int len = mask.length;
+ readIntoMask(source, mask, mask.length);
+ }
+ @SuppressWarnings("UnusedAssignment")
+ public static void readIntoMask(DataInputView source, boolean[] mask, int
len)
+ throws IOException {
int b = 0x00;
int bytePos = 0;
@@ -80,11 +86,15 @@ public final class MaskUtils {
}
}
- @SuppressWarnings("UnusedAssignment")
public static void readIntoAndCopyMask(
DataInputView source, DataOutputView target, boolean[] mask)
throws IOException {
- final int len = mask.length;
+ readIntoAndCopyMask(source, target, mask, mask.length);
+ }
+ @SuppressWarnings("UnusedAssignment")
+ public static void readIntoAndCopyMask(
+ DataInputView source, DataOutputView target, boolean[] mask, int
len)
+ throws IOException {
int b = 0x00;
int bytePos = 0;
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
index df97203..ca0f3f9 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.MaskUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -39,18 +40,27 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public final class LinkedListSerializer<T> extends
TypeSerializer<LinkedList<T>> {
+ // legacy, don't touch until we drop support for 1.9 savepoints
private static final long serialVersionUID = 1L;
- /** The serializer for the elements of the list. */
+ // The serializer for the elements of the list.
private final TypeSerializer<T> elementSerializer;
+ private final boolean hasNullMask;
+ private transient boolean[] reuseMask;
+
/**
* Creates a list serializer that uses the given serializer to serialize
the list's elements.
*
* @param elementSerializer The serializer for the elements of the list
*/
public LinkedListSerializer(TypeSerializer<T> elementSerializer) {
+ this(elementSerializer, true);
+ }
+
+ public LinkedListSerializer(TypeSerializer<T> elementSerializer, boolean
hasNullMask) {
this.elementSerializer = checkNotNull(elementSerializer);
+ this.hasNullMask = hasNullMask;
}
// ------------------------------------------------------------------------
@@ -77,10 +87,7 @@ public final class LinkedListSerializer<T> extends
TypeSerializer<LinkedList<T>>
@Override
public TypeSerializer<LinkedList<T>> duplicate() {
- TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
- return duplicateElement == elementSerializer
- ? this
- : new LinkedListSerializer<>(duplicateElement);
+ return new LinkedListSerializer<>(elementSerializer.duplicate(),
hasNullMask);
}
@Override
@@ -92,7 +99,12 @@ public final class LinkedListSerializer<T> extends
TypeSerializer<LinkedList<T>>
public LinkedList<T> copy(LinkedList<T> from) {
LinkedList<T> newList = new LinkedList<>();
for (T element : from) {
- newList.add(elementSerializer.copy(element));
+ // there is no compatibility problem here as it only copies from
memory to memory
+ if (element == null) {
+ newList.add(null);
+ } else {
+ newList.add(elementSerializer.copy(element));
+ }
}
return newList;
}
@@ -107,20 +119,49 @@ public final class LinkedListSerializer<T> extends
TypeSerializer<LinkedList<T>>
return -1; // var length
}
+ private void ensureReuseMaskLength(int len) {
+ if (reuseMask == null || reuseMask.length < len) {
+ reuseMask = new boolean[len];
+ }
+ }
+
@Override
public void serialize(LinkedList<T> list, DataOutputView target) throws
IOException {
target.writeInt(list.size());
+ if (hasNullMask) {
+ ensureReuseMaskLength(list.size());
+ MaskUtils.writeMask(getNullMask(list), list.size(), target);
+ }
for (T element : list) {
- elementSerializer.serialize(element, target);
+ if (element != null) {
+ elementSerializer.serialize(element, target);
+ }
+ }
+ }
+
+ private boolean[] getNullMask(LinkedList<T> list) {
+ int idx = 0;
+ for (T item : list) {
+ reuseMask[idx] = item == null;
+ idx++;
}
+ return reuseMask;
}
@Override
public LinkedList<T> deserialize(DataInputView source) throws IOException {
final int size = source.readInt();
final LinkedList<T> list = new LinkedList<>();
+ if (hasNullMask) {
+ ensureReuseMaskLength(size);
+ MaskUtils.readIntoMask(source, reuseMask, size);
+ }
for (int i = 0; i < size; i++) {
- list.add(elementSerializer.deserialize(source));
+ if (hasNullMask && reuseMask[i]) {
+ list.add(null);
+ } else {
+ list.add(elementSerializer.deserialize(source));
+ }
}
return list;
}
@@ -135,8 +176,14 @@ public final class LinkedListSerializer<T> extends
TypeSerializer<LinkedList<T>>
// copy number of elements
final int num = source.readInt();
target.writeInt(num);
+ if (hasNullMask) {
+ ensureReuseMaskLength(num);
+ MaskUtils.readIntoAndCopyMask(source, target, reuseMask, num);
+ }
for (int i = 0; i < num; i++) {
- elementSerializer.copy(source, target);
+ if (!(hasNullMask && reuseMask[i])) {
+ elementSerializer.copy(source, target);
+ }
}
}
@@ -169,7 +216,11 @@ public final class LinkedListSerializer<T> extends
TypeSerializer<LinkedList<T>>
public static class LinkedListSerializerSnapshot<T>
extends CompositeTypeSerializerSnapshot<LinkedList<T>,
LinkedListSerializer<T>> {
- private static final int CURRENT_VERSION = 1;
+ private static final int CURRENT_VERSION = 2;
+
+ private static final int FIRST_VERSION_WITH_NULL_MASK = 2;
+
+ private int readVersion = CURRENT_VERSION;
/** Constructor for read instantiation. */
public LinkedListSerializerSnapshot() {
@@ -187,11 +238,27 @@ public final class LinkedListSerializer<T> extends
TypeSerializer<LinkedList<T>>
}
@Override
+ protected void readOuterSnapshot(
+ int readOuterSnapshotVersion, DataInputView in, ClassLoader
userCodeClassLoader) {
+ readVersion = readOuterSnapshotVersion;
+ }
+
+ @Override
+ protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
+ LinkedListSerializer<T> newSerializer) {
+ if (readVersion < FIRST_VERSION_WITH_NULL_MASK) {
+ return OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION;
+ }
+ return OuterSchemaCompatibility.COMPATIBLE_AS_IS;
+ }
+
+ @Override
protected LinkedListSerializer<T>
createOuterSerializerWithNestedSerializers(
TypeSerializer<?>[] nestedSerializers) {
@SuppressWarnings("unchecked")
TypeSerializer<T> elementSerializer = (TypeSerializer<T>)
nestedSerializers[0];
- return new LinkedListSerializer<>(elementSerializer);
+ return new LinkedListSerializer<>(
+ elementSerializer, readVersion >=
FIRST_VERSION_WITH_NULL_MASK);
}
@Override
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
index eea1556..4e7ab1d 100644
---
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerTest.java
@@ -67,6 +67,12 @@ public class LinkedListSerializerTest extends
SerializerTestBase<LinkedList<Long
list4.add(rnd.nextLong());
}
- return (LinkedList<Long>[]) new LinkedList[] {list1, list2, list3,
list4};
+ // list with null values
+ final LinkedList<Long> list5 = new LinkedList<>();
+ for (int i = 0; i < rnd.nextInt(200); i++) {
+ list5.add(rnd.nextBoolean() ? null : rnd.nextLong());
+ }
+
+ return (LinkedList<Long>[]) new LinkedList[] {list1, list2, list3,
list4, list5};
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java
new file mode 100644
index 0000000..ef34e4a
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/LinkedListSerializerUpgradeTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+
+import static org.hamcrest.Matchers.is;
+
+/** A {@link TypeSerializerUpgradeTestBase} for {@link LinkedListSerializer}.
*/
+@RunWith(Parameterized.class)
+public class LinkedListSerializerUpgradeTest
+ extends TypeSerializerUpgradeTestBase<LinkedList<Long>,
LinkedList<Long>> {
+
+ public LinkedListSerializerUpgradeTest(
+ TestSpecification<LinkedList<Long>, LinkedList<Long>>
testSpecification) {
+ super(testSpecification);
+ }
+
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ public static Collection<TestSpecification<?, ?>> testSpecifications()
throws Exception {
+ return Collections.singletonList(
+ new TestSpecification<>(
+ "linked-list-serializer",
+ MigrationVersion.v1_13,
+ LinkedListSerializerSetup.class,
+ LinkedListSerializerVerifier.class));
+ }
+
+ public static TypeSerializer<LinkedList<Long>>
createLinkedListSerializer() {
+ return new LinkedListSerializer<>(new LongSerializer());
+ }
+
+ //
----------------------------------------------------------------------------------------------
+ // Specification for "linked-row-serializer"
+ //
----------------------------------------------------------------------------------------------
+
+ /**
+ * This class is only public to work with {@link
+ * org.apache.flink.api.common.typeutils.ClassRelocator}.
+ */
+ public static final class LinkedListSerializerSetup
+ implements
TypeSerializerUpgradeTestBase.PreUpgradeSetup<LinkedList<Long>> {
+
+ @Override
+ public TypeSerializer<LinkedList<Long>> createPriorSerializer() {
+ return createLinkedListSerializer();
+ }
+
+ @Override
+ public LinkedList<Long> createTestData() {
+ LinkedList<Long> list = new LinkedList<>();
+ list.add(42L);
+ list.add(-42L);
+ list.add(0L);
+ list.add(Long.MAX_VALUE);
+ list.add(Long.MIN_VALUE);
+ return list;
+ }
+ }
+
+ /**
+ * This class is only public to work with {@link
+ * org.apache.flink.api.common.typeutils.ClassRelocator}.
+ */
+ public static final class LinkedListSerializerVerifier
+ implements
TypeSerializerUpgradeTestBase.UpgradeVerifier<LinkedList<Long>> {
+
+ @Override
+ public TypeSerializer<LinkedList<Long>> createUpgradedSerializer() {
+ return createLinkedListSerializer();
+ }
+
+ @Override
+ public Matcher<LinkedList<Long>> testDataMatcher() {
+ LinkedList<Long> list = new LinkedList<>();
+ list.add(42L);
+ list.add(-42L);
+ list.add(0L);
+ list.add(Long.MAX_VALUE);
+ list.add(Long.MIN_VALUE);
+ return is(list);
+ }
+
+ @Override
+ public Matcher<TypeSerializerSchemaCompatibility<LinkedList<Long>>>
+ schemaCompatibilityMatcher(MigrationVersion version) {
+ if (version.isNewerVersionThan(MigrationVersion.v1_13)) {
+ return TypeSerializerMatchers.isCompatibleAsIs();
+ } else {
+ return TypeSerializerMatchers.isCompatibleAfterMigration();
+ }
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/serializer-snapshot
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/serializer-snapshot
new file mode 100644
index 0000000..42e497b
Binary files /dev/null and
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/serializer-snapshot
differ
diff --git
a/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/test-data
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/test-data
new file mode 100644
index 0000000..c9a4d87e
Binary files /dev/null and
b/flink-table/flink-table-runtime-blink/src/test/resources/linked-list-serializer-1.13/test-data
differ