Repository: flink
Updated Branches:
  refs/heads/master fafd5b6ad -> f99c4dd39


[FLINK-8411] Don't allow null in ListState.add()/addAll()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f99c4dd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f99c4dd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f99c4dd3

Branch: refs/heads/master
Commit: f99c4dd395b71877eb70e7fc743d109957205a3d
Parents: fafd5b6
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Feb 14 12:04:20 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sat Feb 17 08:33:54 2018 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |   3 +
 .../streaming/state/RocksDBListState.java       |  14 +-
 .../state/DefaultOperatorStateBackend.java      |   1 +
 .../flink/runtime/state/heap/HeapListState.java |  35 ++--
 .../runtime/state/StateBackendTestBase.java     | 160 +++++++++++++++++--
 5 files changed, 186 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f99c4dd3/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 5040966..403e627 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -696,6 +696,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                @Override
                public void add(T value) throws Exception {
+                       Preconditions.checkNotNull(value, "You cannot add null 
to a ListState.");
                        list.add(value);
                }
 
@@ -717,6 +718,8 @@ public class FlinkKafkaConsumerBaseTest {
                @Override
                public void addAll(List<T> values) throws Exception {
                        if (values != null) {
+                               values.forEach(v -> 
Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
+
                                list.addAll(values);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f99c4dd3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 413615b..f0481ec 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
@@ -112,9 +113,7 @@ public class RocksDBListState<K, N, V>
 
        @Override
        public void add(V value) throws IOException {
-               if (value == null) {
-                       return;
-               }
+               Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
 
                try {
                        writeCurrentKeyWithGroupAndNamespace();
@@ -169,9 +168,11 @@ public class RocksDBListState<K, N, V>
 
        @Override
        public void update(List<V> values) throws Exception {
+               Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+
                clear();
 
-               if (values != null && !values.isEmpty()) {
+               if (!values.isEmpty()) {
                        try {
                                writeCurrentKeyWithGroupAndNamespace();
                                byte[] key = 
keySerializationStream.toByteArray();
@@ -190,7 +191,9 @@ public class RocksDBListState<K, N, V>
 
        @Override
        public void addAll(List<V> values) throws Exception {
-               if (values != null && !values.isEmpty()) {
+               Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+
+               if (!values.isEmpty()) {
                        try {
                                writeCurrentKeyWithGroupAndNamespace();
                                byte[] key = 
keySerializationStream.toByteArray();
@@ -213,6 +216,7 @@ public class RocksDBListState<K, N, V>
                keySerializationStream.reset();
                boolean first = true;
                for (V value : values) {
+                       Preconditions.checkNotNull(value, "You cannot add null 
to a ListState.");
                        if (first) {
                                first = false;
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/f99c4dd3/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index f486643..266483f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -660,6 +660,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
                @Override
                public void add(S value) {
+                       Preconditions.checkNotNull(value, "You cannot add null 
to a ListState.");
                        internalList.add(value);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f99c4dd3/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index dfc7362..f7b5cd2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -67,9 +67,7 @@ public class HeapListState<K, N, V>
 
        @Override
        public void add(V value) {
-               if (value == null) {
-                       return;
-               }
+               Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
 
                final N namespace = currentNamespace;
 
@@ -123,23 +121,36 @@ public class HeapListState<K, N, V>
 
        @Override
        public void update(List<V> values) throws Exception {
-               if (values != null && !values.isEmpty()) {
-                       stateTable.put(currentNamespace, new 
ArrayList<>(values));
-               } else {
+               Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+
+               if (values.isEmpty()) {
                        clear();
+                       return;
                }
+
+               List<V> newStateList = new ArrayList<>();
+               for (V v : values) {
+                       Preconditions.checkNotNull(v, "You cannot add null to a 
ListState.");
+                       newStateList.add(v);
+               }
+
+               stateTable.put(currentNamespace, newStateList);
        }
 
        @Override
        public void addAll(List<V> values) throws Exception {
-               if (values != null && !values.isEmpty()) {
+               Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+
+               if (!values.isEmpty()) {
                        stateTable.transform(currentNamespace, values, 
(previousState, value) -> {
-                               if (previousState != null) {
-                                       previousState.addAll(value);
-                                       return previousState;
-                               } else {
-                                       return new ArrayList<>(value);
+                               if (previousState == null) {
+                                       previousState = new ArrayList<>();
+                               }
+                               for (V v : value) {
+                                       Preconditions.checkNotNull(v, "You 
cannot add null to a ListState.");
+                                       previousState.add(v);
                                }
+                               return previousState;
                        });
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f99c4dd3/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index ad69ae8..8acefa4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1295,36 +1295,179 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                backend.dispose();
        }
 
+       /**
+        * This test verifies that all ListState implementations are consistent 
in not allowing
+        * adding {@code null}.
+        */
        @Test
-       public void testListStateAPIs() throws Exception {
-
+       public void testListStateAddNull() throws Exception {
                AbstractKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(StringSerializer.INSTANCE);
 
                final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
 
                try {
                        ListState<Long> state =
-                               
keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, stateDescr);
+                               keyedBackend.getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       stateDescr);
 
                        keyedBackend.setCurrentKey("abc");
                        assertNull(state.get());
+
+                       expectedException.expect(NullPointerException.class);
                        state.add(null);
+               } finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       /**
+        * This test verifies that all ListState implementations are consistent 
in not allowing
+        * {@link ListState#addAll(List)} to be called with {@code null} 
entries in the list of entries
+        * to add.
+        */
+       @Test
+       public void testListStateAddAllNullEntries() throws Exception {
+               AbstractKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(StringSerializer.INSTANCE);
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+
+               try {
+                       ListState<Long> state =
+                               keyedBackend.getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       stateDescr);
+
+                       keyedBackend.setCurrentKey("abc");
                        assertNull(state.get());
 
+                       expectedException.expect(NullPointerException.class);
+
+                       List<Long> adding = new ArrayList<>();
+                       adding.add(3L);
+                       adding.add(null);
+                       adding.add(5L);
+                       state.addAll(adding);
+               } finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       /**
+        * This test verifies that all ListState implementations are consistent 
in not allowing
+        * {@link ListState#addAll(List)} to be called with {@code null}.
+        */
+       @Test
+       public void testListStateAddAllNull() throws Exception {
+               AbstractKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(StringSerializer.INSTANCE);
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+
+               try {
+                       ListState<Long> state =
+                               keyedBackend.getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       stateDescr);
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       expectedException.expect(NullPointerException.class);
+                       state.addAll(null);
+               } finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       /**
+        * This test verifies that all ListState implementations are consistent 
in not allowing
+        * {@link ListState#addAll(List)} to be called with {@code null} 
entries in the list of entries
+        * to add.
+        */
+       @Test
+       public void testListStateUpdateNullEntries() throws Exception {
+               AbstractKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(StringSerializer.INSTANCE);
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+
+               try {
+                       ListState<Long> state =
+                               keyedBackend.getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       stateDescr);
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       expectedException.expect(NullPointerException.class);
+
+                       List<Long> adding = new ArrayList<>();
+                       adding.add(3L);
+                       adding.add(null);
+                       adding.add(5L);
+                       state.update(adding);
+               } finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       /**
+        * This test verifies that all ListState implementations are consistent 
in not allowing
+        * {@link ListState#addAll(List)} to be called with {@code null}.
+        */
+       @Test
+       public void testListStateUpdateNull() throws Exception {
+               AbstractKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(StringSerializer.INSTANCE);
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+
+               try {
+                       ListState<Long> state =
+                               keyedBackend.getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       stateDescr);
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       expectedException.expect(NullPointerException.class);
+                       state.update(null);
+               } finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       @Test
+       public void testListStateAPIs() throws Exception {
+
+               AbstractKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(StringSerializer.INSTANCE);
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+
+               try {
+                       ListState<Long> state =
+                               
keyedBackend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, stateDescr);
+
                        keyedBackend.setCurrentKey("def");
                        assertNull(state.get());
                        state.add(17L);
                        state.add(11L);
                        assertThat(state.get(), containsInAnyOrder(17L, 11L));
-                       // update(null) should remain the value null
-                       state.update(null);
-                       assertNull(state.get());
                        // update(emptyList) should remain the value null
                        state.update(Collections.emptyList());
                        assertNull(state.get());
                        state.update(Arrays.asList(10L, 16L));
                        assertThat(state.get(), containsInAnyOrder(16L, 10L));
-                       state.add(null);
                        assertThat(state.get(), containsInAnyOrder(16L, 10L));
 
                        keyedBackend.setCurrentKey("abc");
@@ -1332,13 +1475,11 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        keyedBackend.setCurrentKey("g");
                        assertNull(state.get());
-                       state.addAll(null);
                        assertNull(state.get());
                        state.addAll(Collections.emptyList());
                        assertNull(state.get());
                        state.addAll(Arrays.asList(3L, 4L));
                        assertThat(state.get(), containsInAnyOrder(3L, 4L));
-                       state.addAll(null);
                        assertThat(state.get(), containsInAnyOrder(3L, 4L));
                        state.addAll(new ArrayList<>());
                        assertThat(state.get(), containsInAnyOrder(3L, 4L));
@@ -1347,7 +1488,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.addAll(new ArrayList<>());
                        assertThat(state.get(), containsInAnyOrder(3L, 4L, 5L, 
6L));
 
-                       state.add(null);
                        assertThat(state.get(), containsInAnyOrder(3L, 4L, 5L, 
6L));
                        state.update(Arrays.asList(1L, 2L));
                        assertThat(state.get(), containsInAnyOrder(1L, 2L));

Reply via email to