This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 24b661f782e494b2287b4f8ed48fa5a7197eada7 Author: Zhanghao Chen <m...@outlook.com> AuthorDate: Mon Apr 8 21:21:21 2024 +0800 [FLINK-34123][core] Introduce built-in serialization support for Map, List, and Collection --- .../flink/api/java/typeutils/TypeExtractor.java | 29 +++++++ .../api/java/typeutils/PojoTypeExtractionTest.java | 2 +- .../api/java/typeutils/TypeExtractorTest.java | 92 ++++++++++++++++++++-- .../org/apache/flink/types/PojoTestUtilsTest.java | 2 +- 4 files changed, 118 insertions(+), 7 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 93902c4d84c..b761186dd52 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -42,6 +42,7 @@ import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInfo; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; @@ -66,6 +67,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1968,6 +1970,33 @@ public class TypeExtractor { return new EnumTypeInfo(clazz); } + // check for parameterized Collections, requirement: + // 1. Interface types: the underlying implementation types are not preserved across + // serialization + // 2. Concrete type arguments: Flink needs them to dispatch serialization of element types + // Example: + // - OK: List<String>, Collection<String> + // - not OK: LinkedList<String> (implementation type), List (raw type), List<T> (generic + // type argument), or List<?> (wildcard type argument) + if (parameterizedType != null) { + Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); + boolean allTypeArgumentsConcrete = + Arrays.stream(actualTypeArguments).allMatch(arg -> arg instanceof Class<?>); + if (allTypeArgumentsConcrete) { + if (clazz.isAssignableFrom(Map.class)) { + Class<?> keyClass = (Class<?>) actualTypeArguments[0]; + Class<?> valueClass = (Class<?>) actualTypeArguments[1]; + TypeInformation<?> keyTypeInfo = createTypeInfo(keyClass); + TypeInformation<?> valueTypeInfo = createTypeInfo(valueClass); + return (TypeInformation<OUT>) Types.MAP(keyTypeInfo, valueTypeInfo); + } else if (clazz.isAssignableFrom(List.class)) { + Class<?> elementClass = (Class<?>) actualTypeArguments[0]; + TypeInformation<?> elementTypeInfo = createTypeInfo(elementClass); + return (TypeInformation<OUT>) Types.LIST(elementTypeInfo); + } + } + } + // special case for POJOs generated by Avro. if (AvroUtils.isAvroSpecificRecord(clazz)) { return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java index bc5506bb836..eef6ba5d868 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java @@ -421,7 +421,7 @@ public class PojoTypeExtractionTest { fail("already seen"); } collectionSeen = true; - assertThat(field.getTypeInformation()).isEqualTo(new GenericTypeInfo(List.class)); + assertThat(field.getTypeInformation()).isEqualTo(new ListTypeInfo<>(String.class)); } else { fail("Unexpected field " + field); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index 84c2a9255f0..72a1df065f7 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -63,7 +63,10 @@ import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -2316,15 +2319,15 @@ public class TypeExtractorTest { @SuppressWarnings({"unchecked", "rawtypes"}) @Test void testGenericTypeWithSubclassInput() { - Map<String, Object> inputMap = new HashMap<>(); + HashMap<String, Object> inputMap = new LinkedHashMap<>(); inputMap.put("a", "b"); TypeInformation<?> inputType = TypeExtractor.createTypeInfo(inputMap.getClass()); MapFunction<?, ?> function = - new MapFunction<Map<String, Object>, Map<String, Object>>() { + new MapFunction<HashMap<String, Object>, HashMap<String, Object>>() { @Override - public Map<String, Object> map(Map<String, Object> stringObjectMap) + public HashMap<String, Object> map(HashMap<String, Object> stringObjectMap) throws Exception { return stringObjectMap; } @@ -2332,7 +2335,7 @@ public class TypeExtractorTest { TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType); - TypeInformation<?> expected = TypeExtractor.createTypeInfo(Map.class); + TypeInformation<?> expected = TypeExtractor.createTypeInfo(HashMap.class); assertThat(ti).isEqualTo(expected); } @@ -2344,7 +2347,9 @@ public class TypeExtractorTest { TypeInformation<?> inputType = TypeExtractor.createTypeInfo(Map.class); MapFunction<?, ?> function = - (MapFunction<HashMap<String, Object>, Map<String, Object>>) + (MapFunction< + LinkedHashMap<String, Object>, + Map<String, Object>>) stringObjectMap -> stringObjectMap; TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType); }) @@ -2500,4 +2505,81 @@ public class TypeExtractorTest { assertThat(TypeExtractor.getForObject(Timestamp.valueOf("1998-12-12 12:37:45"))) .isEqualTo(SqlTimeTypeInfo.TIMESTAMP); } + + @SuppressWarnings({"rawtypes"}) + public static class PojoWithCollections<T> { + // Supported collection types with concrete type arguments, expected built-in serialization + // support + public Map<String, Integer> mapVal = new HashMap<>(); + public List<String> listVal = new ArrayList<>(); + public Collection<String> collectionVal = new ArrayList<>(); + + // Collection fields with unsupported collection types, treated as generic types + public LinkedList<String> linkedListVal = new LinkedList<>(); + + // Collection fields with raw type, treated as generic types + public List rawListVal = new ArrayList<>(); + + // Collection fields with generic type arguments, treated as generic types + public List<T> genericListVal = new ArrayList<>(); + + // Collection fields with wildcard type arguments, treated as generic types + public List<?> wildcardListVal = new ArrayList<>(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public <T> void testCollectionTypes() { + MapFunction<?, ?> function = + new MapFunction<PojoWithCollections<T>, PojoWithCollections<T>>() { + @Override + public PojoWithCollections map(PojoWithCollections<T> value) { + return null; + } + }; + TypeInformation<?> ti = + TypeExtractor.getMapReturnTypes( + function, + (TypeInformation) + TypeInformation.of(new TypeHint<PojoWithCollections<T>>() {})); + assertThat(ti).isInstanceOf(PojoTypeInfo.class); + testCollectionTypesInternal(ti); + + // use getForClass() + TypeInformation<?> ti2 = TypeExtractor.getForClass(PojoWithCollections.class); + assertThat(ti2).isInstanceOf(PojoTypeInfo.class); + testCollectionTypesInternal(ti2); + + // use getForObject() + PojoWithCollections<T> t = new PojoWithCollections<>(); + TypeInformation<?> ti3 = TypeExtractor.getForObject(t); + assertThat(ti3).isInstanceOf(PojoTypeInfo.class); + testCollectionTypesInternal(ti3); + } + + private void testCollectionTypesInternal(TypeInformation<?> ti) { + PojoTypeInfo<?> pojoTi = (PojoTypeInfo<?>) ti; + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation()) + .isInstanceOf(MapTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation()) + .isInstanceOf(ListTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal")) + .getTypeInformation()) + .isInstanceOf(ListTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("rawListVal")).getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("genericListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + assertThat( + pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("wildcardListVal")) + .getTypeInformation()) + .isInstanceOf(GenericTypeInfo.class); + } } diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java index 506124816b1..2cc1550741f 100644 --- a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/types/PojoTestUtilsTest.java @@ -76,7 +76,7 @@ class PojoTestUtilsTest { } public static class PojoRequiringKryo { - public List<Integer> x; + public List<?> x; } @TypeInfo(FooFactory.class)