This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a6adbdda0cd [FLINK-31835][table-planner] Fix the array type that can't 
be converted from the external primitive array
a6adbdda0cd is described below

commit a6adbdda0cdf90635f0cd7a3427486bced301fbd
Author: Aitozi <[email protected]>
AuthorDate: Mon Jun 12 17:52:42 2023 +0800

    [FLINK-31835][table-planner] Fix the array type that can't be converted 
from the external primitive array
    
    This closes #22485
---
 .../flink/table/types/CollectionDataType.java      | 24 ++++++++-
 .../org/apache/flink/table/types/DataTypeTest.java | 14 +++++
 .../planner/functions/CastFunctionITCase.java      |  2 +-
 .../planner/runtime/stream/sql/FunctionITCase.java | 60 ++++++++++++++++++++++
 .../planner/runtime/stream/table/ValuesITCase.java | 18 ++++++-
 .../table/data/DataStructureConvertersTest.java    | 24 ++++++++-
 .../flink/table/test/TableAssertionTest.java       |  2 +-
 7 files changed, 137 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 239e36eb201..9188530a4ff 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.lang3.ClassUtils;
+
 import javax.annotation.Nullable;
 
 import java.lang.reflect.Array;
@@ -118,9 +120,27 @@ public final class CollectionDataType extends DataType {
         // arrays are a special case because their default conversion class 
depends on the
         // conversion class of the element type
         if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == 
null) {
-            return Array.newInstance(elementDataType.getConversionClass(), 
0).getClass();
+            Class<?> conversionClass =
+                    wrapOrUnWrap(
+                            elementDataType.getConversionClass(),
+                            elementDataType.getLogicalType().isNullable());
+
+            return Array.newInstance(conversionClass, 0).getClass();
+        }
+        return wrapOrUnWrap(clazz, 
elementDataType.getLogicalType().isNullable());
+    }
+
+    private static Class<?> wrapOrUnWrap(@Nullable Class<?> source, boolean 
nullable) {
+        if (source == null) {
+            return null;
+        }
+        if (nullable) {
+            return source.isPrimitive() ? 
ClassUtils.primitiveToWrapper(source) : source;
+        } else {
+            return ClassUtils.isPrimitiveWrapper(source)
+                    ? ClassUtils.wrapperToPrimitive(source)
+                    : source;
         }
-        return clazz;
     }
 
     private DataType updateInnerDataType(DataType elementDataType) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index a7d824009bf..ce9239266f2 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.inference.TypeTransformations;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 
 import org.junit.jupiter.api.Test;
 
@@ -220,4 +222,16 @@ class DataTypeTest {
         assertThat(DataType.getFields(ARRAY(INT()))).isEmpty();
         assertThat(DataType.getFields(INT())).isEmpty();
     }
+
+    @Test
+    void testArrayConversionClass() {
+        assertThat(DataTypes.ARRAY(INT())).hasConversionClass(Integer[].class);
+        
assertThat(DataTypes.ARRAY(INT().notNull())).hasConversionClass(int[].class);
+        DataType type = DataTypes.ARRAY(INT());
+        assertThat(DataTypeUtils.transform(type, 
TypeTransformations.toNullable()))
+                .hasConversionClass(Integer[].class);
+        type = DataTypes.ARRAY(INT()).bridgedTo(int[].class);
+        assertThat(DataTypeUtils.transform(type, 
TypeTransformations.toNullable()))
+                .hasConversionClass(int[].class);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
index 82ea5c5d024..43377e613df 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
@@ -1204,7 +1204,7 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                                 new Long[] {1L, null, 2L})
                         .build(),
                 CastTestSpecBuilder.testCastTo(ARRAY(BIGINT().notNull()))
-                        .fromCase(ARRAY(INT().notNull()), new Integer[] {1, 
2}, new Long[] {1L, 2L})
+                        .fromCase(ARRAY(INT().notNull()), new Integer[] {1, 
2}, new long[] {1L, 2L})
                         .build(),
                 CastTestSpecBuilder.testCastTo(ROW(BIGINT(), BIGINT(), 
STRING(), ARRAY(STRING())))
                         .fromCase(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index a65105c26c4..0daa554d6e6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -1329,6 +1329,50 @@ public class FunctionITCase extends StreamingTestBase {
                 "drop function lowerUdf");
     }
 
+    @Test
+    public void testArrayWithPrimitiveType() {
+        List<Row> sourceData = Arrays.asList(Row.of(1, 2), Row.of(3, 4));
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        tEnv().executeSql(
+                        "CREATE TABLE SourceTable(i INT NOT NULL, j INT NOT 
NULL) WITH ('connector' = 'COLLECTION')");
+        tEnv().executeSql(
+                        "CREATE FUNCTION row_of_array AS '"
+                                + RowOfArrayWithIntFunction.class.getName()
+                                + "'");
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql("SELECT row_of_array(i, j) FROM 
SourceTable").collect());
+        assertThat(rows)
+                .isEqualTo(
+                        Arrays.asList(
+                                Row.of(Row.of((Object) new int[] {1, 2})),
+                                Row.of(Row.of((Object) new int[] {3, 4}))));
+    }
+
+    @Test
+    public void testArrayWithPrimitiveBoxedType() {
+        List<Row> sourceData = Arrays.asList(Row.of(1, null), Row.of(3, null));
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        tEnv().executeSql(
+                        "CREATE TABLE SourceTable(i INT NOT NULL, j INT) WITH 
('connector' = 'COLLECTION')");
+        tEnv().executeSql(
+                        "CREATE FUNCTION row_of_array AS '"
+                                + RowOfArrayWithIntegerFunction.class.getName()
+                                + "'");
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql("SELECT row_of_array(i, j) FROM 
SourceTable").collect());
+        assertThat(rows)
+                .isEqualTo(
+                        Arrays.asList(
+                                Row.of(Row.of((Object) new Integer[] {1, 
null})),
+                                Row.of(Row.of((Object) new Integer[] {3, 
null}))));
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Test functions
     // 
--------------------------------------------------------------------------------------------
@@ -1756,6 +1800,22 @@ public class FunctionITCase extends StreamingTestBase {
         }
     }
 
+    /** A function with Row of array with int as return type for test 
FLINK-31835. */
+    public static class RowOfArrayWithIntFunction extends ScalarFunction {
+        @DataTypeHint("Row<t ARRAY<INT NOT NULL>>")
+        public Row eval(int... v) {
+            return Row.of((Object) v);
+        }
+    }
+
+    /** A function with Row of array with integer as return type for test 
FLINK-31835. */
+    public static class RowOfArrayWithIntegerFunction extends ScalarFunction {
+        @DataTypeHint("Row<t ARRAY<INT>>")
+        public Row eval(Integer... v) {
+            return Row.of((Object) v);
+        }
+    }
+
     private interface FunctionCreator {
         void createFunction(TableEnvironment environment);
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
index d4f0a6016a9..cf9f39da8b8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
@@ -332,7 +332,23 @@ public class ValuesITCase extends StreamingTestBase {
         mapData.put(1, 1);
         mapData.put(2, 2);
 
-        Row row = Row.of(mapData, Row.of(1, 2, 3), new Integer[] {1, 2});
+        Row row = Row.of(mapData, Row.of(1, 2, 3), new int[] {1, 2});
+        Table values = tEnv().fromValues(Collections.singletonList(row));
+        tEnv().createTemporaryView("values_t", values);
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql("select * from values_t").collect());
+
+        assertThat(results).containsExactly(row);
+    }
+
+    @Test
+    public void testArrayWithNullablePrimitiveType() {
+        Map<Integer, Integer> mapData = new HashMap<>();
+        mapData.put(1, 1);
+        mapData.put(2, 2);
+
+        Row row = Row.of(mapData, Row.of(1, 2, 3), new Integer[] {1, 2, null});
         Table values = tEnv().fromValues(Collections.singletonList(row));
         tEnv().createTemporaryView("values_t", values);
         List<Row> results =
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 9662d7179b3..f83a74a312f 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -196,6 +196,14 @@ public class DataStructureConvertersTest {
                                                 1, 2, 3,
                                                 4))), // test List that is not 
backed by an array
 
+                // test for Array with default conversion class
+                TestSpec.forDataType(ARRAY(INT().notNull()))
+                        .disableBridging()
+                        .convertedTo(int[].class, new int[] {1, 2, 3, 4}),
+                TestSpec.forDataType(ARRAY(INT()))
+                        .disableBridging()
+                        .convertedTo(Integer[].class, new Integer[] {1, 2, 3, 
4}),
+
                 // arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE 
are skipped for
                 // simplicity
 
@@ -368,10 +376,14 @@ public class DataStructureConvertersTest {
     @Test
     public void testConversions() {
         for (Map.Entry<Class<?>, Object> from : 
testSpec.conversions.entrySet()) {
-            final DataType fromDataType = 
testSpec.dataType.bridgedTo(from.getKey());
+            DataType fromDataType = testSpec.dataType;
+            if (testSpec.bridgeToTargetClass) {
+                fromDataType = testSpec.dataType.bridgedTo(from.getKey());
+            }
 
             if (testSpec.expectedErrorMessage != null) {
-                assertThatThrownBy(() -> 
DataStructureConverters.getConverter(fromDataType))
+                final DataType type = fromDataType;
+                assertThatThrownBy(() -> 
DataStructureConverters.getConverter(type))
                         .isInstanceOf(TableException.class)
                         .hasMessage(testSpec.expectedErrorMessage);
             } else {
@@ -414,6 +426,8 @@ public class DataStructureConvertersTest {
 
         private final Map<Class<?>, Object> conversionsWithAnotherValue;
 
+        private boolean bridgeToTargetClass;
+
         private @Nullable String expectedErrorMessage;
 
         private TestSpec(String description, DataType dataType) {
@@ -421,6 +435,7 @@ public class DataStructureConvertersTest {
             this.dataType = dataType;
             this.conversions = new LinkedHashMap<>();
             this.conversionsWithAnotherValue = new LinkedHashMap<>();
+            this.bridgeToTargetClass = true;
         }
 
         static TestSpec forDataType(AbstractDataType<?> dataType) {
@@ -453,6 +468,11 @@ public class DataStructureConvertersTest {
             return this;
         }
 
+        TestSpec disableBridging() {
+            this.bridgeToTargetClass = false;
+            return this;
+        }
+
         @Override
         public String toString() {
             return description;
diff --git 
a/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
 
b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
index 8ad96c2262b..df553f163f2 100644
--- 
a/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
+++ 
b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
@@ -57,7 +57,7 @@ class TableAssertionTest {
         BinaryRowData binaryRowData =
                 new RowDataSerializer((RowType) dataType.getLogicalType())
                         .toBinaryRow(genericRowData);
-        Row row = Row.of(10, "my string", new Boolean[] {true, false});
+        Row row = Row.of(10, "my string", new boolean[] {true, false});
 
         // Test equality with RowData
         assertThat(binaryRowData)

Reply via email to