This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24b661f782e494b2287b4f8ed48fa5a7197eada7
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Mon Apr 8 21:21:21 2024 +0800

    [FLINK-34123][core] Introduce built-in serialization support for Map, List, 
and Collection
---
 .../flink/api/java/typeutils/TypeExtractor.java    | 29 +++++++
 .../api/java/typeutils/PojoTypeExtractionTest.java |  2 +-
 .../api/java/typeutils/TypeExtractorTest.java      | 92 ++++++++++++++++++++--
 .../org/apache/flink/types/PojoTestUtilsTest.java  |  2 +-
 4 files changed, 118 insertions(+), 7 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 93902c4d84c..b761186dd52 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -42,6 +42,7 @@ import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -66,6 +67,7 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1968,6 +1970,33 @@ public class TypeExtractor {
             return new EnumTypeInfo(clazz);
         }
 
+        // check for parameterized Collections, requirement:
+        // 1. Interface types: the underlying implementation types are not 
preserved across
+        // serialization
+        // 2. Concrete type arguments: Flink needs them to dispatch 
serialization of element types
+        // Example:
+        // - OK: List<String>, Collection<String>
+        // - not OK: LinkedList<String> (implementation type), List (raw 
type), List<T> (generic
+        // type argument), or List<?> (wildcard type argument)
+        if (parameterizedType != null) {
+            Type[] actualTypeArguments = 
parameterizedType.getActualTypeArguments();
+            boolean allTypeArgumentsConcrete =
+                    Arrays.stream(actualTypeArguments).allMatch(arg -> arg 
instanceof Class<?>);
+            if (allTypeArgumentsConcrete) {
+                if (clazz.isAssignableFrom(Map.class)) {
+                    Class<?> keyClass = (Class<?>) actualTypeArguments[0];
+                    Class<?> valueClass = (Class<?>) actualTypeArguments[1];
+                    TypeInformation<?> keyTypeInfo = createTypeInfo(keyClass);
+                    TypeInformation<?> valueTypeInfo = 
createTypeInfo(valueClass);
+                    return (TypeInformation<OUT>) Types.MAP(keyTypeInfo, 
valueTypeInfo);
+                } else if (clazz.isAssignableFrom(List.class)) {
+                    Class<?> elementClass = (Class<?>) actualTypeArguments[0];
+                    TypeInformation<?> elementTypeInfo = 
createTypeInfo(elementClass);
+                    return (TypeInformation<OUT>) Types.LIST(elementTypeInfo);
+                }
+            }
+        }
+
         // special case for POJOs generated by Avro.
         if (AvroUtils.isAvroSpecificRecord(clazz)) {
             return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
index bc5506bb836..eef6ba5d868 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -421,7 +421,7 @@ public class PojoTypeExtractionTest {
                     fail("already seen");
                 }
                 collectionSeen = true;
-                assertThat(field.getTypeInformation()).isEqualTo(new 
GenericTypeInfo(List.class));
+                assertThat(field.getTypeInformation()).isEqualTo(new 
ListTypeInfo<>(String.class));
 
             } else {
                 fail("Unexpected field " + field);
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
index 84c2a9255f0..72a1df065f7 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
@@ -63,7 +63,10 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -2316,15 +2319,15 @@ public class TypeExtractorTest {
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Test
     void testGenericTypeWithSubclassInput() {
-        Map<String, Object> inputMap = new HashMap<>();
+        HashMap<String, Object> inputMap = new LinkedHashMap<>();
         inputMap.put("a", "b");
         TypeInformation<?> inputType = 
TypeExtractor.createTypeInfo(inputMap.getClass());
 
         MapFunction<?, ?> function =
-                new MapFunction<Map<String, Object>, Map<String, Object>>() {
+                new MapFunction<HashMap<String, Object>, HashMap<String, 
Object>>() {
 
                     @Override
-                    public Map<String, Object> map(Map<String, Object> 
stringObjectMap)
+                    public HashMap<String, Object> map(HashMap<String, Object> 
stringObjectMap)
                             throws Exception {
                         return stringObjectMap;
                     }
@@ -2332,7 +2335,7 @@ public class TypeExtractorTest {
 
         TypeInformation<?> ti =
                 TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
inputType);
-        TypeInformation<?> expected = TypeExtractor.createTypeInfo(Map.class);
+        TypeInformation<?> expected = 
TypeExtractor.createTypeInfo(HashMap.class);
         assertThat(ti).isEqualTo(expected);
     }
 
@@ -2344,7 +2347,9 @@ public class TypeExtractorTest {
                             TypeInformation<?> inputType = 
TypeExtractor.createTypeInfo(Map.class);
 
                             MapFunction<?, ?> function =
-                                    (MapFunction<HashMap<String, Object>, 
Map<String, Object>>)
+                                    (MapFunction<
+                                                    LinkedHashMap<String, 
Object>,
+                                                    Map<String, Object>>)
                                             stringObjectMap -> stringObjectMap;
                             TypeExtractor.getMapReturnTypes(function, 
(TypeInformation) inputType);
                         })
@@ -2500,4 +2505,81 @@ public class TypeExtractorTest {
         assertThat(TypeExtractor.getForObject(Timestamp.valueOf("1998-12-12 
12:37:45")))
                 .isEqualTo(SqlTimeTypeInfo.TIMESTAMP);
     }
+
+    @SuppressWarnings({"rawtypes"})
+    public static class PojoWithCollections<T> {
+        // Supported collection types with concrete type arguments, expected 
built-in serialization
+        // support
+        public Map<String, Integer> mapVal = new HashMap<>();
+        public List<String> listVal = new ArrayList<>();
+        public Collection<String> collectionVal = new ArrayList<>();
+
+        // Collection fields with unsupported collection types, treated as 
generic types
+        public LinkedList<String> linkedListVal = new LinkedList<>();
+
+        // Collection fields with raw type, treated as generic types
+        public List rawListVal = new ArrayList<>();
+
+        // Collection fields with generic type arguments, treated as generic 
types
+        public List<T> genericListVal = new ArrayList<>();
+
+        // Collection fields with wildcard type arguments, treated as generic 
types
+        public List<?> wildcardListVal = new ArrayList<>();
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    @Test
+    public <T> void testCollectionTypes() {
+        MapFunction<?, ?> function =
+                new MapFunction<PojoWithCollections<T>, 
PojoWithCollections<T>>() {
+                    @Override
+                    public PojoWithCollections map(PojoWithCollections<T> 
value) {
+                        return null;
+                    }
+                };
+        TypeInformation<?> ti =
+                TypeExtractor.getMapReturnTypes(
+                        function,
+                        (TypeInformation)
+                                TypeInformation.of(new 
TypeHint<PojoWithCollections<T>>() {}));
+        assertThat(ti).isInstanceOf(PojoTypeInfo.class);
+        testCollectionTypesInternal(ti);
+
+        // use getForClass()
+        TypeInformation<?> ti2 = 
TypeExtractor.getForClass(PojoWithCollections.class);
+        assertThat(ti2).isInstanceOf(PojoTypeInfo.class);
+        testCollectionTypesInternal(ti2);
+
+        // use getForObject()
+        PojoWithCollections<T> t = new PojoWithCollections<>();
+        TypeInformation<?> ti3 = TypeExtractor.getForObject(t);
+        assertThat(ti3).isInstanceOf(PojoTypeInfo.class);
+        testCollectionTypesInternal(ti3);
+    }
+
+    private void testCollectionTypesInternal(TypeInformation<?> ti) {
+        PojoTypeInfo<?> pojoTi = (PojoTypeInfo<?>) ti;
+        
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation())
+                .isInstanceOf(MapTypeInfo.class);
+        
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation())
+                .isInstanceOf(ListTypeInfo.class);
+        assertThat(
+                        
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal"))
+                                .getTypeInformation())
+                .isInstanceOf(ListTypeInfo.class);
+        assertThat(
+                        
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal"))
+                                .getTypeInformation())
+                .isInstanceOf(GenericTypeInfo.class);
+        
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("rawListVal")).getTypeInformation())
+                .isInstanceOf(GenericTypeInfo.class);
+        assertThat(
+                        
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("genericListVal"))
+                                .getTypeInformation())
+                .isInstanceOf(GenericTypeInfo.class);
+        assertThat(
+                        
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("wildcardListVal"))
+                                .getTypeInformation())
+                .isInstanceOf(GenericTypeInfo.class);
+    }
 }
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
index 506124816b1..2cc1550741f 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java
@@ -76,7 +76,7 @@ class PojoTestUtilsTest {
     }
 
     public static class PojoRequiringKryo {
-        public List<Integer> x;
+        public List<?> x;
     }
 
     @TypeInfo(FooFactory.class)

Reply via email to