This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7588a4dcda5f76d3606ba5d3982b48ebae8b653a
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Mon Jan 6 21:04:09 2025 +0800

    [FLINK-35555][core] Introduce Nullable{Map/List/Set}TypeInfo for collection 
type serialization with null value support
    
    This closes #25797
---
 .../flink/api/java/typeutils/ListTypeInfo.java     |  2 +-
 .../api/java/typeutils/NullableListTypeInfo.java   | 60 ++++++++++++++++
 .../api/java/typeutils/NullableMapTypeInfo.java    | 63 +++++++++++++++++
 .../api/java/typeutils/NullableSetTypeInfo.java    | 58 ++++++++++++++++
 .../flink/api/java/typeutils/SetTypeInfo.java      |  2 +-
 .../flink/api/java/typeutils/TypeExtractor.java    |  8 +--
 .../typeutils/base/NullableListSerializerTest.java | 78 +++++++++++++++++++++
 .../typeutils/base/NullableMapSerializerTest.java  | 80 ++++++++++++++++++++++
 .../typeutils/base/NullableSetSerializerTest.java  | 78 +++++++++++++++++++++
 .../java/typeutils/NullableListTypeInfoTest.java   | 35 ++++++++++
 .../java/typeutils/NullableMapTypeInfoTest.java    | 35 ++++++++++
 .../java/typeutils/NullableSetTypeInfoTest.java    | 35 ++++++++++
 .../api/java/typeutils/PojoTypeExtractionTest.java |  3 +-
 13 files changed, 530 insertions(+), 7 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
index e58544f337a..3385e3ca173 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -34,7 +34,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type of the elements in the list.
  */
 @PublicEvolving
-public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
+public class ListTypeInfo<T> extends TypeInformation<List<T>> {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableListTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableListTypeInfo.java
new file mode 100644
index 00000000000..63448242a35
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableListTypeInfo.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+
+import java.util.List;
+
+/**
+ * A {@link TypeInformation} for the list types of the Java API, accepting 
null collection and null
+ * elements.
+ *
+ * @param <T> The type of the elements in the list.
+ */
+@PublicEvolving
+public class NullableListTypeInfo<T> extends ListTypeInfo<T> {
+
+    public NullableListTypeInfo(Class<T> elementTypeClass) {
+        super(elementTypeClass);
+    }
+
+    public NullableListTypeInfo(TypeInformation<T> elementTypeInfo) {
+        super(elementTypeInfo);
+    }
+
+    @Override
+    public TypeSerializer<List<T>> createSerializer(SerializerConfig config) {
+        // ListSerializer does not support null elements
+        TypeSerializer<T> elementTypeSerializer =
+                
NullableSerializer.wrap(getElementTypeInfo().createSerializer(config), false);
+        ListSerializer<T> listSerializer = new 
ListSerializer<>(elementTypeSerializer);
+        return NullableSerializer.wrap(listSerializer, false);
+    }
+
+    @Override
+    public String toString() {
+        return "NullableList<" + getElementTypeInfo() + '>';
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfo.java
new file mode 100644
index 00000000000..408f34426c7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+
+import java.util.Map;
+
+/**
+ * A {@link TypeInformation} for the map types of the Java API, accepting null 
collection and null
+ * key/values.
+ *
+ * @param <K> The type of the keys in the map.
+ * @param <V> The type of the values in the map.
+ */
+@PublicEvolving
+public class NullableMapTypeInfo<K, V> extends MapTypeInfo<K, V> {
+
+    public NullableMapTypeInfo(TypeInformation<K> keyTypeInfo, 
TypeInformation<V> valueTypeInfo) {
+        super(keyTypeInfo, valueTypeInfo);
+    }
+
+    public NullableMapTypeInfo(Class<K> keyClass, Class<V> valueClass) {
+        super(keyClass, valueClass);
+    }
+
+    @Override
+    public TypeSerializer<Map<K, V>> createSerializer(SerializerConfig config) 
{
+        // MapSerializer does not support null key
+        TypeSerializer<K> keyTypeSerializer =
+                
NullableSerializer.wrap(getKeyTypeInfo().createSerializer(config), false);
+        TypeSerializer<V> valueTypeSerializer = 
getValueTypeInfo().createSerializer(config);
+        MapSerializer<K, V> mapSerializer =
+                new MapSerializer<>(keyTypeSerializer, valueTypeSerializer);
+        return NullableSerializer.wrap(mapSerializer, false);
+    }
+
+    @Override
+    public String toString() {
+        return "NullableMap<" + getKeyTypeInfo() + ", " + getValueTypeInfo() + 
">";
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfo.java
new file mode 100644
index 00000000000..296fd1d7712
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.SetSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+
+import java.util.Set;
+
+/**
+ * A {@link TypeInformation} for the set types of the Java API, accepting null 
collection and null
+ * elements.
+ *
+ * @param <T> The type of the elements in the set.
+ */
+@PublicEvolving
+public class NullableSetTypeInfo<T> extends SetTypeInfo<T> {
+
+    public NullableSetTypeInfo(Class<T> elementTypeClass) {
+        super(elementTypeClass);
+    }
+
+    public NullableSetTypeInfo(TypeInformation<T> elementTypeInfo) {
+        super(elementTypeInfo);
+    }
+
+    @Override
+    public TypeSerializer<Set<T>> createSerializer(SerializerConfig config) {
+        TypeSerializer<T> elementTypeSerializer = 
getElementTypeInfo().createSerializer(config);
+        SetSerializer<T> setSerializer = new 
SetSerializer<>(elementTypeSerializer);
+        return NullableSerializer.wrap(setSerializer, false);
+    }
+
+    @Override
+    public String toString() {
+        return "NullableSet<" + getElementTypeInfo() + '>';
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java
index fba52102ed6..3dd2b52d978 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/SetTypeInfo.java
@@ -34,7 +34,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type of the elements in the set.
  */
 @PublicEvolving
-public final class SetTypeInfo<T> extends TypeInformation<Set<T>> {
+public class SetTypeInfo<T> extends TypeInformation<Set<T>> {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index c1acc304829..9846ee40e85 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -42,7 +42,6 @@ import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -1989,15 +1988,16 @@ public class TypeExtractor {
                     Class<?> valueClass = (Class<?>) actualTypeArguments[1];
                     TypeInformation<?> keyTypeInfo = createTypeInfo(keyClass);
                     TypeInformation<?> valueTypeInfo = 
createTypeInfo(valueClass);
-                    return (TypeInformation<OUT>) Types.MAP(keyTypeInfo, 
valueTypeInfo);
+                    return (TypeInformation<OUT>)
+                            new NullableMapTypeInfo<>(keyTypeInfo, 
valueTypeInfo);
                 } else if (clazz.isAssignableFrom(List.class)) {
                     Class<?> elementClass = (Class<?>) actualTypeArguments[0];
                     TypeInformation<?> elementTypeInfo = 
createTypeInfo(elementClass);
-                    return (TypeInformation<OUT>) Types.LIST(elementTypeInfo);
+                    return (TypeInformation<OUT>) new 
NullableListTypeInfo<>(elementTypeInfo);
                 } else if (clazz.isAssignableFrom(Set.class)) {
                     Class<?> elementClass = (Class<?>) actualTypeArguments[0];
                     TypeInformation<?> elementTypeInfo = 
createTypeInfo(elementClass);
-                    return (TypeInformation<OUT>) Types.SET(elementTypeInfo);
+                    return (TypeInformation<OUT>) new 
NullableSetTypeInfo<>(elementTypeInfo);
                 }
             }
         }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableListSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableListSerializerTest.java
new file mode 100644
index 00000000000..9f3e59e00fd
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableListSerializerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.NullableListTypeInfo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/** A test for the serializer of {@link NullableListTypeInfo}. */
+class NullableListSerializerTest extends SerializerTestBase<List<Long>> {
+
+    @Override
+    protected TypeSerializer<List<Long>> createSerializer() {
+        return new NullableListTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO)
+                .createSerializer(new SerializerConfigImpl());
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Class<List<Long>> getTypeClass() {
+        return (Class<List<Long>>) (Class<?>) List.class;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    @Override
+    protected List<Long>[] getTestData() {
+        final Random rnd = new Random(123654789);
+
+        // null list
+        final List<Long> list1 = null;
+
+        // empty lists
+        final List<Long> list2 = Collections.emptyList();
+        final List<Long> list3 = new ArrayList<>();
+
+        // single element lists
+        final List<Long> list4 = Collections.singletonList(55L);
+        final List<Long> list5 = new ArrayList<>();
+        list5.add(777888L);
+
+        // longer list with null value
+        final List<Long> list6 = new ArrayList<>();
+        for (int i = 0; i < rnd.nextInt(200); i++) {
+            list6.add(rnd.nextLong());
+        }
+        list6.add(null);
+
+        return (List<Long>[]) new List[] {list1, list2, list3, list4, list5, 
list6};
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableMapSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableMapSerializerTest.java
new file mode 100644
index 00000000000..069e858f31e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableMapSerializerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.NullableMapTypeInfo;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/** A test for the serializer of {@link NullableMapTypeInfo}. */
+class NullableMapSerializerTest extends SerializerTestBase<Map<Long, String>> {
+
+    @Override
+    protected TypeSerializer<Map<Long, String>> createSerializer() {
+        return new NullableMapTypeInfo<>(
+                        BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO)
+                .createSerializer(new SerializerConfigImpl());
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Class<Map<Long, String>> getTypeClass() {
+        return (Class<Map<Long, String>>) (Class<?>) Map.class;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    @Override
+    protected Map<Long, String>[] getTestData() {
+        final Random rnd = new Random(123654789);
+
+        // null map
+        final Map<Long, String> map1 = null;
+
+        // empty maps
+        final Map<Long, String> map2 = Collections.emptyMap();
+        final Map<Long, String> map3 = new HashMap<>();
+
+        // single element maps
+        final Map<Long, String> map4 = Collections.singletonMap(0L, "hello");
+        final Map<Long, String> map5 = new HashMap<>();
+        map5.put(12345L, "12345L");
+
+        // longer maps with null key and null value
+        final Map<Long, String> map6 = new HashMap<>();
+        for (int i = 0; i < rnd.nextInt(200); i++) {
+            map6.put(rnd.nextLong(), Long.toString(rnd.nextLong()));
+        }
+        map6.put(rnd.nextLong(), null);
+        map6.put(null, Long.toString(rnd.nextLong()));
+
+        return (Map<Long, String>[]) new Map[] {map1, map2, map3, map4, map5, 
map6};
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableSetSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableSetSerializerTest.java
new file mode 100644
index 00000000000..d74cafd4460
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/NullableSetSerializerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.NullableSetTypeInfo;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/** A test for the serializer of {@link NullableSetTypeInfo}. */
+class NullableSetSerializerTest extends SerializerTestBase<Set<Long>> {
+
+    @Override
+    protected TypeSerializer<Set<Long>> createSerializer() {
+        return new NullableSetTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO)
+                .createSerializer(new SerializerConfigImpl());
+    }
+
+    @Override
+    protected int getLength() {
+        return -1;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Class<Set<Long>> getTypeClass() {
+        return (Class<Set<Long>>) (Class<?>) Set.class;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    @Override
+    protected Set<Long>[] getTestData() {
+        final Random rnd = new Random(123654789);
+
+        // null set
+        final Set<Long> set1 = null;
+
+        // empty sets
+        final Set<Long> set2 = Collections.emptySet();
+        final Set<Long> set3 = new HashSet<>();
+
+        // single element sets
+        final Set<Long> set4 = Collections.singleton(55L);
+        final Set<Long> set5 = new HashSet<>();
+        set5.add(12345L);
+
+        // longer sets with null value
+        final Set<Long> set6 = new HashSet<>();
+        for (int i = 0; i < rnd.nextInt(200); i++) {
+            set6.add(rnd.nextLong());
+        }
+        set6.add(null);
+
+        return (Set<Long>[]) new Set[] {set1, set2, set3, set4, set5, set6};
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableListTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableListTypeInfoTest.java
new file mode 100644
index 00000000000..7c3172af2a8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableListTypeInfoTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/** Test for {@link NullableListTypeInfoTest}. */
+class NullableListTypeInfoTest extends 
TypeInformationTestBase<NullableListTypeInfo<?>> {
+
+    @Override
+    protected NullableListTypeInfo<?>[] getTestData() {
+        return new NullableListTypeInfo<?>[] {
+            new NullableListTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO),
+            new NullableListTypeInfo<>(BasicTypeInfo.BOOLEAN_TYPE_INFO),
+            new NullableListTypeInfo<>(Object.class)
+        };
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfoTest.java
new file mode 100644
index 00000000000..ae6b806e446
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableMapTypeInfoTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/** Test for {@link NullableMapTypeInfo}. */
+class NullableMapTypeInfoTest extends 
TypeInformationTestBase<NullableMapTypeInfo<?, ?>> {
+
+    @Override
+    protected NullableMapTypeInfo<?, ?>[] getTestData() {
+        return new NullableMapTypeInfo<?, ?>[] {
+            new NullableMapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
+            new NullableMapTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO),
+            new NullableMapTypeInfo<>(String.class, Boolean.class)
+        };
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfoTest.java
new file mode 100644
index 00000000000..f0ade0253a1
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/NullableSetTypeInfoTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+
+/** Test for {@link NullableSetTypeInfo}. */
+class NullableSetTypeInfoTest extends 
TypeInformationTestBase<NullableSetTypeInfo<?>> {
+
+    @Override
+    protected NullableSetTypeInfo<?>[] getTestData() {
+        return new NullableSetTypeInfo<?>[] {
+            new NullableSetTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO),
+            new NullableSetTypeInfo<>(BasicTypeInfo.BOOLEAN_TYPE_INFO),
+            new NullableSetTypeInfo<>(Object.class),
+        };
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
index eef6ba5d868..28977a32488 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -421,7 +421,8 @@ public class PojoTypeExtractionTest {
                     fail("already seen");
                 }
                 collectionSeen = true;
-                assertThat(field.getTypeInformation()).isEqualTo(new 
ListTypeInfo<>(String.class));
+                assertThat(field.getTypeInformation())
+                        .isEqualTo(new NullableListTypeInfo<>(String.class));
 
             } else {
                 fail("Unexpected field " + field);

Reply via email to