This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a6adbdda0cd [FLINK-31835][table-planner] Fix the array type that can't
be converted from the external primitive array
a6adbdda0cd is described below
commit a6adbdda0cdf90635f0cd7a3427486bced301fbd
Author: Aitozi <[email protected]>
AuthorDate: Mon Jun 12 17:52:42 2023 +0800
[FLINK-31835][table-planner] Fix the array type that can't be converted
from the external primitive array
This closes #22485
---
.../flink/table/types/CollectionDataType.java | 24 ++++++++-
.../org/apache/flink/table/types/DataTypeTest.java | 14 +++++
.../planner/functions/CastFunctionITCase.java | 2 +-
.../planner/runtime/stream/sql/FunctionITCase.java | 60 ++++++++++++++++++++++
.../planner/runtime/stream/table/ValuesITCase.java | 18 ++++++-
.../table/data/DataStructureConvertersTest.java | 24 ++++++++-
.../flink/table/test/TableAssertionTest.java | 2 +-
7 files changed, 137 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 239e36eb201..9188530a4ff 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.lang3.ClassUtils;
+
import javax.annotation.Nullable;
import java.lang.reflect.Array;
@@ -118,9 +120,27 @@ public final class CollectionDataType extends DataType {
// arrays are a special case because their default conversion class
depends on the
// conversion class of the element type
if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz ==
null) {
- return Array.newInstance(elementDataType.getConversionClass(),
0).getClass();
+ Class<?> conversionClass =
+ wrapOrUnWrap(
+ elementDataType.getConversionClass(),
+ elementDataType.getLogicalType().isNullable());
+
+ return Array.newInstance(conversionClass, 0).getClass();
+ }
+ return wrapOrUnWrap(clazz,
elementDataType.getLogicalType().isNullable());
+ }
+
+ private static Class<?> wrapOrUnWrap(@Nullable Class<?> source, boolean
nullable) {
+ if (source == null) {
+ return null;
+ }
+ if (nullable) {
+ return source.isPrimitive() ?
ClassUtils.primitiveToWrapper(source) : source;
+ } else {
+ return ClassUtils.isPrimitiveWrapper(source)
+ ? ClassUtils.wrapperToPrimitive(source)
+ : source;
}
- return clazz;
}
private DataType updateInnerDataType(DataType elementDataType) {
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index a7d824009bf..ce9239266f2 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.inference.TypeTransformations;
+import org.apache.flink.table.types.utils.DataTypeUtils;
import org.junit.jupiter.api.Test;
@@ -220,4 +222,16 @@ class DataTypeTest {
assertThat(DataType.getFields(ARRAY(INT()))).isEmpty();
assertThat(DataType.getFields(INT())).isEmpty();
}
+
+ @Test
+ void testArrayConversionClass() {
+ assertThat(DataTypes.ARRAY(INT())).hasConversionClass(Integer[].class);
+
assertThat(DataTypes.ARRAY(INT().notNull())).hasConversionClass(int[].class);
+ DataType type = DataTypes.ARRAY(INT());
+ assertThat(DataTypeUtils.transform(type,
TypeTransformations.toNullable()))
+ .hasConversionClass(Integer[].class);
+ type = DataTypes.ARRAY(INT()).bridgedTo(int[].class);
+ assertThat(DataTypeUtils.transform(type,
TypeTransformations.toNullable()))
+ .hasConversionClass(int[].class);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
index 82ea5c5d024..43377e613df 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
@@ -1204,7 +1204,7 @@ public class CastFunctionITCase extends
BuiltInFunctionTestBase {
new Long[] {1L, null, 2L})
.build(),
CastTestSpecBuilder.testCastTo(ARRAY(BIGINT().notNull()))
- .fromCase(ARRAY(INT().notNull()), new Integer[] {1,
2}, new Long[] {1L, 2L})
+ .fromCase(ARRAY(INT().notNull()), new Integer[] {1,
2}, new long[] {1L, 2L})
.build(),
CastTestSpecBuilder.testCastTo(ROW(BIGINT(), BIGINT(),
STRING(), ARRAY(STRING())))
.fromCase(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index a65105c26c4..0daa554d6e6 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -1329,6 +1329,50 @@ public class FunctionITCase extends StreamingTestBase {
"drop function lowerUdf");
}
+ @Test
+ public void testArrayWithPrimitiveType() {
+ List<Row> sourceData = Arrays.asList(Row.of(1, 2), Row.of(3, 4));
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ tEnv().executeSql(
+ "CREATE TABLE SourceTable(i INT NOT NULL, j INT NOT
NULL) WITH ('connector' = 'COLLECTION')");
+ tEnv().executeSql(
+ "CREATE FUNCTION row_of_array AS '"
+ + RowOfArrayWithIntFunction.class.getName()
+ + "'");
+ List<Row> rows =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql("SELECT row_of_array(i, j) FROM
SourceTable").collect());
+ assertThat(rows)
+ .isEqualTo(
+ Arrays.asList(
+ Row.of(Row.of((Object) new int[] {1, 2})),
+ Row.of(Row.of((Object) new int[] {3, 4}))));
+ }
+
+ @Test
+ public void testArrayWithPrimitiveBoxedType() {
+ List<Row> sourceData = Arrays.asList(Row.of(1, null), Row.of(3, null));
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+
+ tEnv().executeSql(
+ "CREATE TABLE SourceTable(i INT NOT NULL, j INT) WITH
('connector' = 'COLLECTION')");
+ tEnv().executeSql(
+ "CREATE FUNCTION row_of_array AS '"
+ + RowOfArrayWithIntegerFunction.class.getName()
+ + "'");
+ List<Row> rows =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql("SELECT row_of_array(i, j) FROM
SourceTable").collect());
+ assertThat(rows)
+ .isEqualTo(
+ Arrays.asList(
+ Row.of(Row.of((Object) new Integer[] {1,
null})),
+ Row.of(Row.of((Object) new Integer[] {3,
null}))));
+ }
+
//
--------------------------------------------------------------------------------------------
// Test functions
//
--------------------------------------------------------------------------------------------
@@ -1756,6 +1800,22 @@ public class FunctionITCase extends StreamingTestBase {
}
}
+ /** A function with Row of array with int as return type for test
FLINK-31835. */
+ public static class RowOfArrayWithIntFunction extends ScalarFunction {
+ @DataTypeHint("Row<t ARRAY<INT NOT NULL>>")
+ public Row eval(int... v) {
+ return Row.of((Object) v);
+ }
+ }
+
+ /** A function with Row of array with integer as return type for test
FLINK-31835. */
+ public static class RowOfArrayWithIntegerFunction extends ScalarFunction {
+ @DataTypeHint("Row<t ARRAY<INT>>")
+ public Row eval(Integer... v) {
+ return Row.of((Object) v);
+ }
+ }
+
private interface FunctionCreator {
void createFunction(TableEnvironment environment);
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
index d4f0a6016a9..cf9f39da8b8 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java
@@ -332,7 +332,23 @@ public class ValuesITCase extends StreamingTestBase {
mapData.put(1, 1);
mapData.put(2, 2);
- Row row = Row.of(mapData, Row.of(1, 2, 3), new Integer[] {1, 2});
+ Row row = Row.of(mapData, Row.of(1, 2, 3), new int[] {1, 2});
+ Table values = tEnv().fromValues(Collections.singletonList(row));
+ tEnv().createTemporaryView("values_t", values);
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql("select * from values_t").collect());
+
+ assertThat(results).containsExactly(row);
+ }
+
+ @Test
+ public void testArrayWithNullablePrimitiveType() {
+ Map<Integer, Integer> mapData = new HashMap<>();
+ mapData.put(1, 1);
+ mapData.put(2, 2);
+
+ Row row = Row.of(mapData, Row.of(1, 2, 3), new Integer[] {1, 2, null});
Table values = tEnv().fromValues(Collections.singletonList(row));
tEnv().createTemporaryView("values_t", values);
List<Row> results =
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 9662d7179b3..f83a74a312f 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -196,6 +196,14 @@ public class DataStructureConvertersTest {
1, 2, 3,
4))), // test List that is not
backed by an array
+ // test for Array with default conversion class
+ TestSpec.forDataType(ARRAY(INT().notNull()))
+ .disableBridging()
+ .convertedTo(int[].class, new int[] {1, 2, 3, 4}),
+ TestSpec.forDataType(ARRAY(INT()))
+ .disableBridging()
+ .convertedTo(Integer[].class, new Integer[] {1, 2, 3,
4}),
+
// arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE
are skipped for
// simplicity
@@ -368,10 +376,14 @@ public class DataStructureConvertersTest {
@Test
public void testConversions() {
for (Map.Entry<Class<?>, Object> from :
testSpec.conversions.entrySet()) {
- final DataType fromDataType =
testSpec.dataType.bridgedTo(from.getKey());
+ DataType fromDataType = testSpec.dataType;
+ if (testSpec.bridgeToTargetClass) {
+ fromDataType = testSpec.dataType.bridgedTo(from.getKey());
+ }
if (testSpec.expectedErrorMessage != null) {
- assertThatThrownBy(() ->
DataStructureConverters.getConverter(fromDataType))
+ final DataType type = fromDataType;
+ assertThatThrownBy(() ->
DataStructureConverters.getConverter(type))
.isInstanceOf(TableException.class)
.hasMessage(testSpec.expectedErrorMessage);
} else {
@@ -414,6 +426,8 @@ public class DataStructureConvertersTest {
private final Map<Class<?>, Object> conversionsWithAnotherValue;
+ private boolean bridgeToTargetClass;
+
private @Nullable String expectedErrorMessage;
private TestSpec(String description, DataType dataType) {
@@ -421,6 +435,7 @@ public class DataStructureConvertersTest {
this.dataType = dataType;
this.conversions = new LinkedHashMap<>();
this.conversionsWithAnotherValue = new LinkedHashMap<>();
+ this.bridgeToTargetClass = true;
}
static TestSpec forDataType(AbstractDataType<?> dataType) {
@@ -453,6 +468,11 @@ public class DataStructureConvertersTest {
return this;
}
+ TestSpec disableBridging() {
+ this.bridgeToTargetClass = false;
+ return this;
+ }
+
@Override
public String toString() {
return description;
diff --git
a/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
index 8ad96c2262b..df553f163f2 100644
---
a/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
+++
b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java
@@ -57,7 +57,7 @@ class TableAssertionTest {
BinaryRowData binaryRowData =
new RowDataSerializer((RowType) dataType.getLogicalType())
.toBinaryRow(genericRowData);
- Row row = Row.of(10, "my string", new Boolean[] {true, false});
+ Row row = Row.of(10, "my string", new boolean[] {true, false});
// Test equality with RowData
assertThat(binaryRowData)