This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f5ac8c3  [FLINK-18168][table-runtime-blink] Fix array reuse for 
BinaryArrayData in converters
f5ac8c3 is described below

commit f5ac8c352b7fb5aff5a78cffa348f72bd8492509
Author: zoudan <[email protected]>
AuthorDate: Tue Jun 9 16:12:11 2020 +0800

    [FLINK-18168][table-runtime-blink] Fix array reuse for BinaryArrayData in 
converters
    
    This closes #12542.
---
 .../data/conversion/ArrayObjectArrayConverter.java    |  2 +-
 .../flink/table/data/util/DataFormatConverters.java   |  2 +-
 .../flink/table/data/DataFormatConvertersTest.java    | 12 +++++++++++-
 .../flink/table/data/DataStructureConvertersTest.java | 19 +++++++++++++++++++
 4 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
index 5049064..c191c55 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -115,7 +115,7 @@ public class ArrayObjectArrayConverter<E> implements 
DataStructureConverter<Arra
                for (int pos = 0; pos < length; pos++) {
                        writeElement(pos, external[pos]);
                }
-               return completeWriter();
+               return completeWriter().copy();
        }
 
        private E[] toJavaArray(ArrayData internal) {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
index 347a517..acef60d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
@@ -1138,7 +1138,7 @@ public class DataFormatConverters {
                                }
                        }
                        reuseWriter.complete();
-                       return reuseArray;
+                       return reuseArray.copy();
                }
 
                @Override
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index 82e33d1..ac53aa2 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -148,9 +148,18 @@ public class DataFormatConvertersTest {
        }
 
        private static void test(TypeInformation typeInfo, Object value) {
+               test(typeInfo, value, null);
+       }
+
+       private static void test(TypeInformation typeInfo, Object value, Object 
anotherValue) {
                DataFormatConverter converter = getConverter(typeInfo);
+               final Object innerValue = converter.toInternal(value);
+               if (anotherValue != null) {
+                       converter.toInternal(anotherValue);
+               }
+
                Assert.assertTrue(Arrays.deepEquals(
-                               new Object[] 
{converter.toExternal(converter.toInternal(value))}, new Object[] {value}));
+                       new Object[] {converter.toExternal(innerValue)}, new 
Object[]{value}));
        }
 
        private static DataFormatConverter getConverter(DataType dataType) {
@@ -193,6 +202,7 @@ public class DataFormatConvertersTest {
                test(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new Double[] 
{null, null});
                test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] 
{null, null});
                test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] 
{"haha", "hehe"});
+               test(ObjectArrayTypeInfo.getInfoFor(Types.STRING), new String[] 
{"haha", "hehe"}, new String[] {"aa", "bb"});
                test(new MapTypeInfo<>(Types.STRING, Types.INT), null);
 
                HashMap<String, Integer> map = new HashMap<>();
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 610d53c..29710fa 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -345,6 +345,12 @@ public class DataStructureConvertersTest {
                                                        null
                                                )
                                        })
+                               .convertedToWithAnotherValue(
+                                       Row[].class,
+                                       new Row[] {
+                                               Row.of(null, null),
+                                               Row.of(new 
PojoWithImmutableFields(10, "Bob"), null)
+                                       })
                );
        }
 
@@ -369,6 +375,11 @@ public class DataStructureConvertersTest {
 
                        final Object internalValue = 
fromConverter.toInternalOrNull(from.getValue());
 
+                       final Object anotherValue = 
testSpec.conversionsWithAnotherValue.get(from.getKey());
+                       if (anotherValue != null) {
+                               fromConverter.toInternalOrNull(anotherValue);
+                       }
+
                        for (Map.Entry<Class<?>, Object> to : 
testSpec.conversions.entrySet()) {
                                final DataType toDataType = 
testSpec.dataType.bridgedTo(to.getKey());
 
@@ -395,12 +406,15 @@ public class DataStructureConvertersTest {
 
                private final Map<Class<?>, Object> conversions;
 
+               private final Map<Class<?>, Object> conversionsWithAnotherValue;
+
                private @Nullable String expectedErrorMessage;
 
                private TestSpec(String description, DataType dataType) {
                        this.description = description;
                        this.dataType = dataType;
                        this.conversions = new LinkedHashMap<>();
+                       this.conversionsWithAnotherValue = new 
LinkedHashMap<>();
                }
 
                static TestSpec forDataType(AbstractDataType<?> dataType) {
@@ -420,6 +434,11 @@ public class DataStructureConvertersTest {
                        return this;
                }
 
+               <T> TestSpec convertedToWithAnotherValue(Class<T> clazz, T 
value) {
+                       conversionsWithAnotherValue.put(clazz, value);
+                       return this;
+               }
+
                <T> TestSpec convertedToSupplier(Class<T> clazz, Supplier<T> 
supplier) {
                        conversions.put(clazz, supplier.get());
                        return this;

Reply via email to