This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b460d3d6fe0 [FLINK-36004] Deep copy the BinaryArrayData in
ArrayDataSerializer#copy while using custom ArrayData (#25196)
b460d3d6fe0 is described below
commit b460d3d6fe00a18342783113635d1d80774fefe6
Author: Xuyang <[email protected]>
AuthorDate: Fri Aug 16 11:30:48 2024 +0800
[FLINK-36004] Deep copy the BinaryArrayData in ArrayDataSerializer#copy
while using custom ArrayData (#25196)
* [FLINK-36004] Deep copy the BinaryArrayData in ArrayDataSerializer#copy
while using custom ArrayData
* check all subtask of TypeSerializer manually and refactor the test
---
.../api/common/typeutils/SerializerTestBase.java | 20 +++++++++++++++++++-
.../table/runtime/typeutils/ArrayDataSerializer.java | 2 +-
.../table/runtime/typeutils/MapDataSerializer.java | 1 +
.../runtime/typeutils/ArrayDataSerializerTest.java | 1 +
.../runtime/typeutils/MapDataSerializerTest.java | 3 +++
5 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index d27e349f0b1..167b59211f0 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -175,6 +175,24 @@ public abstract class SerializerTestBase<T> {
}
}
+ @Test
+ void testCopyIndependently() {
+ TypeSerializer<T> serializer = getSerializer();
+ T[] originalData = getData();
+ List<T> copiedData = new ArrayList<>(originalData.length);
+
+ for (T datum : originalData) {
+ T copy = serializer.copy(datum, serializer.createInstance());
+ copiedData.add(copy);
+ }
+
+ for (int i = 0; i < originalData.length; i++) {
+ T original = originalData[i];
+ T copied = copiedData.get(i);
+ deepEquals("Copied element is not equal to the original element.",
original, copied);
+ }
+ }
+
@Test
void testCopyIntoNewElements() {
@@ -427,7 +445,7 @@ public abstract class SerializerTestBase<T> {
//
--------------------------------------------------------------------------------------------
- private void deepEquals(String message, T should, T is) {
+ protected void deepEquals(String message, T should, T is) {
assertThat(is)
.as(message)
.matches(CustomEqualityMatcher.deeplyEquals(should).withChecker(checker));
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
index 61c63c8defe..98c6bd8d297 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
@@ -89,7 +89,7 @@ public class ArrayDataSerializer extends
TypeSerializer<ArrayData> {
} else if (from instanceof BinaryArrayData) {
return ((BinaryArrayData) from).copy();
} else {
- return toBinaryArray(from);
+ return toBinaryArray(from).copy();
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
index 53af778f3d1..c667777c896 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
@@ -106,6 +106,7 @@ public class MapDataSerializer extends
TypeSerializer<MapData> {
if (from instanceof BinaryMapData) {
return ((BinaryMapData) from).copy();
} else {
+ // the returned value has been copied
return toBinaryMap(from);
}
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
index f4914e822e5..2c1b5ee1616 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializerTest.java
@@ -90,6 +90,7 @@ class ArrayDataSerializerTest extends
SerializerTestBase<ArrayData> {
createArray("11", "lele", "haa", "ke"),
createColumnarArray("11", "lele", "haa", "ke"),
createCustomTypeArray("11", "lele", "haa", "ke"),
+ createCustomTypeArray("111", "lelele", "haaa", "kee")
};
}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
index 9442b89e826..67d8518f19e 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/MapDataSerializerTest.java
@@ -84,9 +84,12 @@ public class MapDataSerializerTest extends
SerializerTestBase<MapData> {
protected MapData[] getTestData() {
Map<Object, Object> first = new HashMap<>();
first.put(1, StringData.fromString(""));
+ Map<Object, Object> second = new HashMap<>();
+ second.put(2, StringData.fromString(""));
return new MapData[] {
new GenericMapData(first),
new CustomMapData(first),
+ new CustomMapData(second),
BinaryMapData.valueOf(
createArray(1, 2),
ArrayDataSerializerTest.createArray("11", "haa")),
BinaryMapData.valueOf(