This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6834ed1  [FLINK-18417][table] Support List as a conversion class for 
ARRAY
6834ed1 is described below

commit 6834ed181aa9edda6b9c7fb61696de93170a88d1
Author: Timo Walther <[email protected]>
AuthorDate: Tue Jun 23 17:16:35 2020 +0200

    [FLINK-18417][table] Support List as a conversion class for ARRAY
    
    This closes #12765.
---
 docs/dev/table/types.md                            |  2 +
 docs/dev/table/types.zh.md                         |  2 +
 .../table/types/extraction/DataTypeExtractor.java  | 40 ++++++++--
 .../flink/table/types/logical/ArrayType.java       |  4 +
 .../apache/flink/table/types/LogicalTypesTest.java |  6 +-
 .../types/extraction/DataTypeExtractorTest.java    | 16 ++++
 .../table/data/conversion/ArrayListConverter.java  | 86 ++++++++++++++++++++++
 .../data/conversion/ArrayObjectArrayConverter.java |  4 +-
 .../data/conversion/DataStructureConverters.java   |  8 ++
 .../table/data/DataStructureConvertersTest.java    | 55 ++++++++++++--
 10 files changed, 207 insertions(+), 16 deletions(-)

diff --git a/docs/dev/table/types.md b/docs/dev/table/types.md
index 16991d6..698deca 100644
--- a/docs/dev/table/types.md
+++ b/docs/dev/table/types.md
@@ -1007,6 +1007,8 @@ equivalent to `ARRAY<INT>`.
 | Java Type                              | Input | Output | Remarks            
               |
 
|:---------------------------------------|:-----:|:------:|:----------------------------------|
 |*t*`[]`                                 | (X)   | (X)    | Depends on the 
subtype. *Default* |
+| `java.util.List<t>`                    | X     | X      |                    
               |
+| *subclass* of `java.util.List<t>`      | X     |        |                    
               |
 |`org.apache.flink.table.data.ArrayData` | X     | X      | Internal data 
structure.          |
 
 #### `MAP`
diff --git a/docs/dev/table/types.zh.md b/docs/dev/table/types.zh.md
index 252ff02..582bcd4 100644
--- a/docs/dev/table/types.zh.md
+++ b/docs/dev/table/types.zh.md
@@ -925,6 +925,8 @@ DataTypes.ARRAY(t)
 | Java 类型 | 输入 | 输出 | 备注                           |
 |:----------|:-----:|:------:|:----------------------------------|
 |*t*`[]`    | (X)   | (X)    | 依赖于子类型。 *缺省* |
+|`java.util.List<t>`    | X   | X    |            |
+| *subclass* of `java.util.List<t>`    | X     |        |                  |
 
 #### `MAP`
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
index ff353fd..3b20183 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
@@ -314,15 +314,35 @@ public final class DataTypeExtractor {
                        return DataTypes.ARRAY(
                                extractDataTypeOrRaw(template, typeHierarchy, 
genericArray.getGenericComponentType()));
                }
+
+               final Class<?> clazz = toClass(type);
+               if (clazz == null) {
+                       return null;
+               }
+
                // for my.custom.Pojo[][]
-               else if (type instanceof Class) {
-                       final Class<?> clazz = (Class<?>) type;
-                       if (clazz.isArray()) {
-                               return DataTypes.ARRAY(
-                                       extractDataTypeOrRaw(template, 
typeHierarchy, clazz.getComponentType()));
-                       }
+               if (clazz.isArray()) {
+                       return DataTypes.ARRAY(
+                               extractDataTypeOrRaw(template, typeHierarchy, 
clazz.getComponentType()));
                }
-               return null;
+
+               // for List<T>
+               // we only allow List here (not a subclass) because we cannot 
guarantee more specific
+               // data structures after conversion
+               if (clazz != List.class) {
+                       return null;
+               }
+               if (!(type instanceof ParameterizedType)) {
+                       throw extractionError(
+                               "The class '%s' needs generic parameters for an 
array type.",
+                               List.class.getName());
+               }
+               final ParameterizedType parameterizedType = (ParameterizedType) 
type;
+               final DataType element = extractDataTypeOrRaw(
+                       template,
+                       typeHierarchy,
+                       parameterizedType.getActualTypeArguments()[0]);
+               return DataTypes.ARRAY(element).bridgedTo(List.class);
        }
 
        private @Nullable DataType extractEnforcedRawType(DataTypeTemplate 
template, Type type) {
@@ -429,11 +449,15 @@ public final class DataTypeExtractor {
 
        private @Nullable DataType extractMapType(DataTypeTemplate template, 
List<Type> typeHierarchy, Type type) {
                final Class<?> clazz = toClass(type);
+               // we only allow Map here (not a subclass) because we cannot 
guarantee more specific
+               // data structures after conversion
                if (clazz != Map.class) {
                        return null;
                }
                if (!(type instanceof ParameterizedType)) {
-                       throw extractionError("Raw map type needs generic 
parameters.");
+                       throw extractionError(
+                               "The class '%s' needs generic parameters for a 
map type.",
+                               Map.class.getName());
                }
                final ParameterizedType parameterizedType = (ParameterizedType) 
type;
                final DataType key = extractDataTypeOrRaw(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
index f563863..2d87279 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ArrayType.java
@@ -42,6 +42,7 @@ public final class ArrayType extends LogicalType {
        public static final String FORMAT = "ARRAY<%s>";
 
        private static final Set<String> INPUT_OUTPUT_CONVERSION = 
conversionSet(
+               List.class.getName(),
                ArrayData.class.getName());
 
        private final LogicalType elementType;
@@ -76,6 +77,9 @@ public final class ArrayType extends LogicalType {
 
        @Override
        public boolean supportsInputConversion(Class<?> clazz) {
+               if (List.class.isAssignableFrom(clazz)) {
+                       return true;
+               }
                if (INPUT_OUTPUT_CONVERSION.contains(clazz.getName())) {
                        return true;
                }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index eac4014..85395b9 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -68,9 +68,11 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -405,8 +407,8 @@ public class LogicalTypesTest {
                        new ArrayType(new TimestampType()),
                        "ARRAY<TIMESTAMP(6)>",
                        "ARRAY<TIMESTAMP(6)>",
-                       new Class[]{java.sql.Timestamp[].class, 
java.time.LocalDateTime[].class},
-                       new Class[]{java.sql.Timestamp[].class, 
java.time.LocalDateTime[].class},
+                       new Class[]{java.sql.Timestamp[].class, 
java.time.LocalDateTime[].class, List.class, ArrayList.class},
+                       new Class[]{java.sql.Timestamp[].class, 
java.time.LocalDateTime[].class, List.class},
                        new LogicalType[]{new TimestampType()},
                        new ArrayType(new SmallIntType())
                );
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index 2d60a76..87322c6 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -305,6 +305,16 @@ public class DataTypeExtractorTest {
                                        "Could not extract a data type from 
'java.util.HashMap<java.lang.Integer, java.lang.String>'. " +
                                                "Interpreting it as a 
structured type was also not successful."),
 
+                       TestSpec
+                               .forGeneric(
+                                       "ARRAY type with List conversion class",
+                                       TableFunction.class, 0, 
TableFunctionWithList.class)
+                               .expectDataType(
+                                       DataTypes.ARRAY(
+                                               
DataTypes.ARRAY(DataTypes.STRING()).bridgedTo(List.class)
+                                       ).bridgedTo(List.class)
+                               ),
+
                        // simple structured type without RAW type
                        TestSpec
                                .forType(SimplePojo.class)
@@ -744,6 +754,12 @@ public class DataTypeExtractorTest {
 
        // 
--------------------------------------------------------------------------------------------
 
+       private static class TableFunctionWithList extends 
TableFunction<List<List<String>>> {
+
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
        /**
         * Complex POJO with raw types.
         */
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
new file mode 100644
index 0000000..8acd078
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayListConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.conversion;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
+
+/**
+ * Converter for {@link ArrayType} of {@link List} external type.
+ */
+@Internal
+public class ArrayListConverter<E> implements 
DataStructureConverter<ArrayData, List<E>> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final E[] arrayKind;
+
+       private final ArrayObjectArrayConverter<E> elementsConverter;
+
+       private ArrayListConverter(E[] arrayKind, ArrayObjectArrayConverter<E> 
elementsConverter) {
+               this.arrayKind = arrayKind;
+               this.elementsConverter = elementsConverter;
+       }
+
+       @Override
+       public void open(ClassLoader classLoader) {
+               elementsConverter.open(classLoader);
+       }
+
+       @Override
+       public ArrayData toInternal(List<E> external) {
+               return 
elementsConverter.toInternal(external.toArray(arrayKind));
+       }
+
+       @Override
+       public List<E> toExternal(ArrayData internal) {
+               return Arrays.asList(elementsConverter.toExternal(internal));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Factory method
+       // 
--------------------------------------------------------------------------------------------
+
+       public static ArrayListConverter<?> create(DataType dataType) {
+               final DataType elementDataType = dataType.getChildren().get(0);
+               return new ArrayListConverter<>(
+                       
createObjectArrayKind(elementDataType.getConversionClass()),
+                       
ArrayObjectArrayConverter.createForElement(elementDataType));
+       }
+
+       /**
+        * Creates the kind of array for {@link List#toArray(Object[])}.
+        */
+       private static Object[] createObjectArrayKind(Class<?> elementClazz) {
+               // e.g. int[] is not a Object[]
+               if (elementClazz.isPrimitive()) {
+                       return (Object[]) 
Array.newInstance(primitiveToWrapper(elementClazz), 0);
+               }
+               // e.g. int[][] and Integer[] are Object[]
+               return (Object[]) Array.newInstance(elementClazz, 0);
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
index c191c55..c8c7192 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/ArrayObjectArrayConverter.java
@@ -34,6 +34,8 @@ import org.apache.commons.lang3.ArrayUtils;
 import java.io.Serializable;
 import java.lang.reflect.Array;
 
+import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
+
 /**
  * Converter for {@link ArrayType} of nested primitive or object arrays 
external types.
  */
@@ -168,7 +170,7 @@ public class ArrayObjectArrayConverter<E> implements 
DataStructureConverter<Arra
        public static <E> ArrayObjectArrayConverter<E> 
createForElement(DataType elementDataType) {
                final LogicalType elementType = 
elementDataType.getLogicalType();
                return new ArrayObjectArrayConverter<>(
-                       (Class<E>) elementDataType.getConversionClass(),
+                       (Class<E>) 
primitiveToWrapper(elementDataType.getConversionClass()),
                        BinaryArrayData.calculateFixLengthPartSize(elementType),
                        BinaryArrayWriter.createNullSetter(elementType),
                        BinaryWriter.createValueSetter(elementType),
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
index 2548733..89dcea6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java
@@ -34,6 +34,7 @@ import org.apache.flink.types.Row;
 
 import java.math.BigDecimal;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Supplier;
@@ -146,10 +147,17 @@ public final class DataStructureConverters {
                // special cases
                switch (logicalType.getTypeRoot()) {
                        case ARRAY:
+                               // for subclasses of List
+                               if 
(List.class.isAssignableFrom(dataType.getConversionClass())) {
+                                       return 
ArrayListConverter.create(dataType);
+                               }
+                               // for non-primitive arrays
                                return 
ArrayObjectArrayConverter.create(dataType);
                        case MULTISET:
+                               // for subclasses of Map
                                return 
MapMapConverter.createForMultisetType(dataType);
                        case MAP:
+                               // for subclasses of Map
                                return 
MapMapConverter.createForMapType(dataType);
                        case DISTINCT_TYPE:
                                return 
getConverterInternal(dataType.getChildren().get(0));
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 29710fa..bf423bc 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -53,8 +53,10 @@ import java.time.Period;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -205,12 +207,14 @@ public class DataStructureConvertersTest {
                        TestSpec
                                .forDataType(ARRAY(BOOLEAN()))
                                .convertedTo(Boolean[].class, new 
Boolean[]{true, null, true, true})
+                               .convertedTo(List.class, Arrays.asList(true, 
null, true, true))
                                .convertedTo(ArrayData.class, new 
GenericArrayData(new Boolean[]{true, null, true, true})),
 
                        TestSpec
-                               .forDataType(ARRAY(INT().notNull()))
+                               
.forDataType(ARRAY(INT().notNull().bridgedTo(int.class))) // int.class should 
not have an impact
                                .convertedTo(int[].class, new int[]{1, 2, 3, 4})
-                               .convertedTo(Integer[].class, new Integer[]{1, 
2, 3, 4}),
+                               .convertedTo(Integer[].class, new Integer[]{1, 
2, 3, 4})
+                               .convertedTo(List.class, new 
LinkedList<>(Arrays.asList(1, 2, 3, 4))), // test List that is not backed by an 
array
 
                        // arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 
DOUBLE are skipped for simplicity
 
@@ -218,10 +222,13 @@ public class DataStructureConvertersTest {
                                .forDataType(ARRAY(DATE()))
                                .convertedTo(
                                        LocalDate[].class,
-                                       new LocalDate[]{null, 
LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")}),
+                                       new LocalDate[]{null, 
LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12")})
+                               .convertedTo(
+                                       List.class,
+                                       Arrays.asList(null, 
LocalDate.parse("2010-11-12"), null, LocalDate.parse("2010-11-12"))),
 
                        TestSpec
-                               .forDataType(MAP(INT(), BOOLEAN()))
+                               .forDataType(MAP(INT().bridgedTo(int.class), 
BOOLEAN())) // int.class should not have an impact
                                .convertedTo(Map.class, createIdentityMap())
                                .convertedTo(MapData.class, new 
GenericMapData(createIdentityMap())),
 
@@ -350,7 +357,16 @@ public class DataStructureConvertersTest {
                                        new Row[] {
                                                Row.of(null, null),
                                                Row.of(new 
PojoWithImmutableFields(10, "Bob"), null)
-                                       })
+                                       }),
+
+                       TestSpec
+                               .forDataType(DataTypes.of(PojoWithList.class))
+                               .convertedTo(
+                                       PojoWithList.class,
+                                       new 
PojoWithList(Arrays.asList(Arrays.asList(1.0, null, 2.0, null), 
Collections.emptyList(), null)))
+                               .convertedTo(
+                                       Row.class,
+                                       Row.of(Arrays.asList(Arrays.asList(1.0, 
null, 2.0, null), Collections.emptyList(), null)))
                );
        }
 
@@ -757,4 +773,33 @@ public class DataStructureConvertersTest {
                        return result;
                }
        }
+
+       /**
+        * Pojo with {@link List}.
+        */
+       public static class PojoWithList {
+
+               public List<List<Double>> factors;
+
+               public PojoWithList(List<List<Double>> factors) {
+                       this.factors = factors;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       PojoWithList that = (PojoWithList) o;
+                       return Objects.equals(factors, that.factors);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(factors);
+               }
+       }
 }

Reply via email to