This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7588a4dcda5f76d3606ba5d3982b48ebae8b653a Author: Zhanghao Chen <m...@outlook.com> AuthorDate: Mon Jan 6 21:04:09 2025 +0800 [FLINK-35555][core] Introduce Nullable{Map/List/Set}TypeInfo for collection type serialization with null value support This closes #25797 --- .../flink/api/java/typeutils/ListTypeInfo.java | 2 +- .../api/java/typeutils/NullableListTypeInfo.java | 60 ++++++++++++++++ .../api/java/typeutils/NullableMapTypeInfo.java | 63 +++++++++++++++++ .../api/java/typeutils/NullableSetTypeInfo.java | 58 ++++++++++++++++ .../flink/api/java/typeutils/SetTypeInfo.java | 2 +- .../flink/api/java/typeutils/TypeExtractor.java | 8 +-- .../typeutils/base/NullableListSerializerTest.java | 78 +++++++++++++++++++++ .../typeutils/base/NullableMapSerializerTest.java | 80 ++++++++++++++++++++++ .../typeutils/base/NullableSetSerializerTest.java | 78 +++++++++++++++++++++ .../java/typeutils/NullableListTypeInfoTest.java | 35 ++++++++++ .../java/typeutils/NullableMapTypeInfoTest.java | 35 ++++++++++ .../java/typeutils/NullableSetTypeInfoTest.java | 35 ++++++++++ .../api/java/typeutils/PojoTypeExtractionTest.java | 3 +- 13 files changed, 530 insertions(+), 7 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java index e58544f337a..3385e3ca173 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java @@ -34,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <T> The type of the elements in the list. */ @PublicEvolving -public final class ListTypeInfo<T> extends TypeInformation<List<T>> { +public class ListTypeInfo<T> extends TypeInformation<List<T>> { private static final long serialVersionUID = 1L; diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableListTypeInfo.java new file mode 100644 index 00000000000..63448242a35 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableListTypeInfo.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; + +import java.util.List; + +/** + * A {@link TypeInformation} for the list types of the Java API, accepting null collection and null + * elements. + * + * @param <T> The type of the elements in the list. + */ +@PublicEvolving +public class NullableListTypeInfo<T> extends ListTypeInfo<T> { + + public NullableListTypeInfo(Class<T> elementTypeClass) { + super(elementTypeClass); + } + + public NullableListTypeInfo(TypeInformation<T> elementTypeInfo) { + super(elementTypeInfo); + } + + @Override + public TypeSerializer<List<T>> createSerializer(SerializerConfig config) { + // ListSerializer does not support null elements + TypeSerializer<T> elementTypeSerializer = + NullableSerializer.wrap(getElementTypeInfo().createSerializer(config), false); + ListSerializer<T> listSerializer = new ListSerializer<>(elementTypeSerializer); + return NullableSerializer.wrap(listSerializer, false); + } + + @Override + public String toString() { + return "NullableList<" + getElementTypeInfo() + '>'; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfo.java new file mode 100644 index 00000000000..408f34426c7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; + +import java.util.Map; + +/** + * A {@link TypeInformation} for the map types of the Java API, accepting null collection and null + * key/values. + * + * @param <K> The type of the keys in the map. + * @param <V> The type of the values in the map. + */ +@PublicEvolving +public class NullableMapTypeInfo<K, V> extends MapTypeInfo<K, V> { + + public NullableMapTypeInfo(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo) { + super(keyTypeInfo, valueTypeInfo); + } + + public NullableMapTypeInfo(Class<K> keyClass, Class<V> valueClass) { + super(keyClass, valueClass); + } + + @Override + public TypeSerializer<Map<K, V>> createSerializer(SerializerConfig config) { + // MapSerializer does not support null key + TypeSerializer<K> keyTypeSerializer = + NullableSerializer.wrap(getKeyTypeInfo().createSerializer(config), false); + TypeSerializer<V> valueTypeSerializer = getValueTypeInfo().createSerializer(config); + MapSerializer<K, V> mapSerializer = + new MapSerializer<>(keyTypeSerializer, valueTypeSerializer); + return NullableSerializer.wrap(mapSerializer, false); + } + + @Override + public String toString() { + return "NullableMap<" + getKeyTypeInfo() + ", " + getValueTypeInfo() + ">"; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfo.java new file mode 100644 index 00000000000..296fd1d7712 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfo.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.SetSerializer; +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; + +import java.util.Set; + +/** + * A {@link TypeInformation} for the set types of the Java API, accepting null collection and null + * elements. + * + * @param <T> The type of the elements in the set. + */ +@PublicEvolving +public class NullableSetTypeInfo<T> extends SetTypeInfo<T> { + + public NullableSetTypeInfo(Class<T> elementTypeClass) { + super(elementTypeClass); + } + + public NullableSetTypeInfo(TypeInformation<T> elementTypeInfo) { + super(elementTypeInfo); + } + + @Override + public TypeSerializer<Set<T>> createSerializer(SerializerConfig config) { + TypeSerializer<T> elementTypeSerializer = getElementTypeInfo().createSerializer(config); + SetSerializer<T> setSerializer = new SetSerializer<>(elementTypeSerializer); + return NullableSerializer.wrap(setSerializer, false); + } + + @Override + public String toString() { + return "NullableSet<" + getElementTypeInfo() + '>'; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java index fba52102ed6..3dd2b52d978 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java @@ -34,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <T> The type of the elements in the set. */ @PublicEvolving -public final class SetTypeInfo<T> extends TypeInformation<Set<T>> { +public class SetTypeInfo<T> extends TypeInformation<Set<T>> { private static final long serialVersionUID = 1L; diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index c1acc304829..9846ee40e85 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -42,7 +42,6 @@ import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInfo; import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; @@ -1989,15 +1988,16 @@ public class TypeExtractor { Class<?> valueClass = (Class<?>) actualTypeArguments[1]; TypeInformation<?> keyTypeInfo = createTypeInfo(keyClass); TypeInformation<?> valueTypeInfo = createTypeInfo(valueClass); - return (TypeInformation<OUT>) Types.MAP(keyTypeInfo, valueTypeInfo); + return (TypeInformation<OUT>) + new NullableMapTypeInfo<>(keyTypeInfo, valueTypeInfo); } else if (clazz.isAssignableFrom(List.class)) { Class<?> elementClass = (Class<?>) actualTypeArguments[0]; TypeInformation<?> elementTypeInfo = createTypeInfo(elementClass); - return (TypeInformation<OUT>) Types.LIST(elementTypeInfo); + return (TypeInformation<OUT>) new NullableListTypeInfo<>(elementTypeInfo); } else if (clazz.isAssignableFrom(Set.class)) { Class<?> elementClass = (Class<?>) actualTypeArguments[0]; TypeInformation<?> elementTypeInfo = createTypeInfo(elementClass); - return (TypeInformation<OUT>) Types.SET(elementTypeInfo); + return (TypeInformation<OUT>) new NullableSetTypeInfo<>(elementTypeInfo); } } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableListSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableListSerializerTest.java new file mode 100644 index 00000000000..9f3e59e00fd --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableListSerializerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.NullableListTypeInfo; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +/** A test for the serializer of {@link NullableListTypeInfo}. */ +class NullableListSerializerTest extends SerializerTestBase<List<Long>> { + + @Override + protected TypeSerializer<List<Long>> createSerializer() { + return new NullableListTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO) + .createSerializer(new SerializerConfigImpl()); + } + + @Override + protected int getLength() { + return -1; + } + + @SuppressWarnings("unchecked") + @Override + protected Class<List<Long>> getTypeClass() { + return (Class<List<Long>>) (Class<?>) List.class; + } + + @SuppressWarnings({"unchecked"}) + @Override + protected List<Long>[] getTestData() { + final Random rnd = new Random(123654789); + + // null list + final List<Long> list1 = null; + + // empty lists + final List<Long> list2 = Collections.emptyList(); + final List<Long> list3 = new ArrayList<>(); + + // single element lists + final List<Long> list4 = Collections.singletonList(55L); + final List<Long> list5 = new ArrayList<>(); + list5.add(777888L); + + // longer list with null value + final List<Long> list6 = new ArrayList<>(); + for (int i = 0; i < rnd.nextInt(200); i++) { + list6.add(rnd.nextLong()); + } + list6.add(null); + + return (List<Long>[]) new List[] {list1, list2, list3, list4, list5, list6}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableMapSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableMapSerializerTest.java new file mode 100644 index 00000000000..069e858f31e --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableMapSerializerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.NullableMapTypeInfo; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** A test for the serializer of {@link NullableMapTypeInfo}. */ +class NullableMapSerializerTest extends SerializerTestBase<Map<Long, String>> { + + @Override + protected TypeSerializer<Map<Long, String>> createSerializer() { + return new NullableMapTypeInfo<>( + BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) + .createSerializer(new SerializerConfigImpl()); + } + + @Override + protected int getLength() { + return -1; + } + + @SuppressWarnings("unchecked") + @Override + protected Class<Map<Long, String>> getTypeClass() { + return (Class<Map<Long, String>>) (Class<?>) Map.class; + } + + @SuppressWarnings({"unchecked"}) + @Override + protected Map<Long, String>[] getTestData() { + final Random rnd = new Random(123654789); + + // null map + final Map<Long, String> map1 = null; + + // empty maps + final Map<Long, String> map2 = Collections.emptyMap(); + final Map<Long, String> map3 = new HashMap<>(); + + // single element maps + final Map<Long, String> map4 = Collections.singletonMap(0L, "hello"); + final Map<Long, String> map5 = new HashMap<>(); + map5.put(12345L, "12345L"); + + // longer maps with null key and null value + final Map<Long, String> map6 = new HashMap<>(); + for (int i = 0; i < rnd.nextInt(200); i++) { + map6.put(rnd.nextLong(), Long.toString(rnd.nextLong())); + } + map6.put(rnd.nextLong(), null); + map6.put(null, Long.toString(rnd.nextLong())); + + return (Map<Long, String>[]) new Map[] {map1, map2, map3, map4, map5, map6}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableSetSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableSetSerializerTest.java new file mode 100644 index 00000000000..d74cafd4460 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableSetSerializerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.NullableSetTypeInfo; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +/** A test for the serializer of {@link NullableSetTypeInfo}. */ +class NullableSetSerializerTest extends SerializerTestBase<Set<Long>> { + + @Override + protected TypeSerializer<Set<Long>> createSerializer() { + return new NullableSetTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO) + .createSerializer(new SerializerConfigImpl()); + } + + @Override + protected int getLength() { + return -1; + } + + @SuppressWarnings("unchecked") + @Override + protected Class<Set<Long>> getTypeClass() { + return (Class<Set<Long>>) (Class<?>) Set.class; + } + + @SuppressWarnings({"unchecked"}) + @Override + protected Set<Long>[] getTestData() { + final Random rnd = new Random(123654789); + + // null set + final Set<Long> set1 = null; + + // empty sets + final Set<Long> set2 = Collections.emptySet(); + final Set<Long> set3 = new HashSet<>(); + + // single element sets + final Set<Long> set4 = Collections.singleton(55L); + final Set<Long> set5 = new HashSet<>(); + set5.add(12345L); + + // longer sets with null value + final Set<Long> set6 = new HashSet<>(); + for (int i = 0; i < rnd.nextInt(200); i++) { + set6.add(rnd.nextLong()); + } + set6.add(null); + + return (Set<Long>[]) new Set[] {set1, set2, set3, set4, set5, set6}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableListTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableListTypeInfoTest.java new file mode 100644 index 00000000000..7c3172af2a8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableListTypeInfoTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; + +/** Test for {@link NullableListTypeInfoTest}. */ +class NullableListTypeInfoTest extends TypeInformationTestBase<NullableListTypeInfo<?>> { + + @Override + protected NullableListTypeInfo<?>[] getTestData() { + return new NullableListTypeInfo<?>[] { + new NullableListTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO), + new NullableListTypeInfo<>(BasicTypeInfo.BOOLEAN_TYPE_INFO), + new NullableListTypeInfo<>(Object.class) + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfoTest.java new file mode 100644 index 00000000000..ae6b806e446 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfoTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; + +/** Test for {@link NullableMapTypeInfo}. */ +class NullableMapTypeInfoTest extends TypeInformationTestBase<NullableMapTypeInfo<?, ?>> { + + @Override + protected NullableMapTypeInfo<?, ?>[] getTestData() { + return new NullableMapTypeInfo<?, ?>[] { + new NullableMapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), + new NullableMapTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), + new NullableMapTypeInfo<>(String.class, Boolean.class) + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfoTest.java new file mode 100644 index 00000000000..f0ade0253a1 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfoTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; + +/** Test for {@link NullableSetTypeInfo}. */ +class NullableSetTypeInfoTest extends TypeInformationTestBase<NullableSetTypeInfo<?>> { + + @Override + protected NullableSetTypeInfo<?>[] getTestData() { + return new NullableSetTypeInfo<?>[] { + new NullableSetTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO), + new NullableSetTypeInfo<>(BasicTypeInfo.BOOLEAN_TYPE_INFO), + new NullableSetTypeInfo<>(Object.class), + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java index eef6ba5d868..28977a32488 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java @@ -421,7 +421,8 @@ public class PojoTypeExtractionTest { fail("already seen"); } collectionSeen = true; - assertThat(field.getTypeInformation()).isEqualTo(new ListTypeInfo<>(String.class)); + assertThat(field.getTypeInformation()) + .isEqualTo(new NullableListTypeInfo<>(String.class)); } else { fail("Unexpected field " + field);