This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b460d3d6fe0 [FLINK-36004] Deep copy the BinaryArrayData in 
ArrayDataSerializer#copy while using custom ArrayData (#25196)
b460d3d6fe0 is described below

commit b460d3d6fe00a18342783113635d1d80774fefe6
Author: Xuyang <[email protected]>
AuthorDate: Fri Aug 16 11:30:48 2024 +0800

    [FLINK-36004] Deep copy the BinaryArrayData in ArrayDataSerializer#copy 
while using custom ArrayData (#25196)
    
    * [FLINK-36004] Deep copy the BinaryArrayData in ArrayDataSerializer#copy 
while using custom ArrayData
    
    * check all subtask of TypeSerializer manually and refactor the test
---
 .../api/common/typeutils/SerializerTestBase.java     | 20 +++++++++++++++++++-
 .../table/runtime/typeutils/ArrayDataSerializer.java |  2 +-
 .../table/runtime/typeutils/MapDataSerializer.java   |  1 +
 .../runtime/typeutils/ArrayDataSerializerTest.java   |  1 +
 .../runtime/typeutils/MapDataSerializerTest.java     |  3 +++
 5 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index d27e349f0b1..167b59211f0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -175,6 +175,24 @@ public abstract class SerializerTestBase<T> {
         }
     }
 
+    @Test
+    void testCopyIndependently() {
+        TypeSerializer<T> serializer = getSerializer();
+        T[] originalData = getData();
+        List<T> copiedData = new ArrayList<>(originalData.length);
+
+        for (T datum : originalData) {
+            T copy = serializer.copy(datum, serializer.createInstance());
+            copiedData.add(copy);
+        }
+
+        for (int i = 0; i < originalData.length; i++) {
+            T original = originalData[i];
+            T copied = copiedData.get(i);
+            deepEquals("Copied element is not equal to the original element.", 
original, copied);
+        }
+    }
+
     @Test
     void testCopyIntoNewElements() {
 
@@ -427,7 +445,7 @@ public abstract class SerializerTestBase<T> {
 
     // 
--------------------------------------------------------------------------------------------
 
-    private void deepEquals(String message, T should, T is) {
+    protected void deepEquals(String message, T should, T is) {
         assertThat(is)
                 .as(message)
                 
.matches(CustomEqualityMatcher.deeplyEquals(should).withChecker(checker));
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
index 61c63c8defe..98c6bd8d297 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
@@ -89,7 +89,7 @@ public class ArrayDataSerializer extends 
TypeSerializer<ArrayData> {
         } else if (from instanceof BinaryArrayData) {
             return ((BinaryArrayData) from).copy();
         } else {
-            return toBinaryArray(from);
+            return toBinaryArray(from).copy();
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
index 53af778f3d1..c667777c896 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
@@ -106,6 +106,7 @@ public class MapDataSerializer extends 
TypeSerializer<MapData> {
         if (from instanceof BinaryMapData) {
             return ((BinaryMapData) from).copy();
         } else {
+            // the returned value has been copied
             return toBinaryMap(from);
         }
     }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
index f4914e822e5..2c1b5ee1616 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
@@ -90,6 +90,7 @@ class ArrayDataSerializerTest extends 
SerializerTestBase<ArrayData> {
             createArray("11", "lele", "haa", "ke"),
             createColumnarArray("11", "lele", "haa", "ke"),
             createCustomTypeArray("11", "lele", "haa", "ke"),
+            createCustomTypeArray("111", "lelele", "haaa", "kee")
         };
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
index 9442b89e826..67d8518f19e 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
@@ -84,9 +84,12 @@ public class MapDataSerializerTest extends 
SerializerTestBase<MapData> {
     protected MapData[] getTestData() {
         Map<Object, Object> first = new HashMap<>();
         first.put(1, StringData.fromString(""));
+        Map<Object, Object> second = new HashMap<>();
+        second.put(2, StringData.fromString(""));
         return new MapData[] {
             new GenericMapData(first),
             new CustomMapData(first),
+            new CustomMapData(second),
             BinaryMapData.valueOf(
                     createArray(1, 2), 
ArrayDataSerializerTest.createArray("11", "haa")),
             BinaryMapData.valueOf(

Reply via email to