This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e0240c78207 [FLINK-33060][state] Fix the javadoc of ListState 
interfaces about not allowing null value
e0240c78207 is described below

commit e0240c782073f879a1a1e3fe22b45a38ee499c45
Author: Zakelly <[email protected]>
AuthorDate: Mon Nov 6 11:11:46 2023 +0800

    [FLINK-33060][state] Fix the javadoc of ListState interfaces about not 
allowing null value
---
 .../org/apache/flink/api/common/state/AppendingState.java  |  3 ++-
 .../java/org/apache/flink/api/common/state/ListState.java  | 14 ++++++++++----
 .../apache/flink/runtime/state/PartitionableListState.java | 10 ++++++++--
 .../flink/runtime/state/internal/InternalListState.java    | 14 ++++++++++----
 .../operators/sorted/state/BatchExecutionKeyListState.java |  2 ++
 5 files changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index d020c0b7dcf..b081f86466c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -58,7 +58,8 @@ public interface AppendingState<IN, OUT> extends State {
      * of values. The next time {@link #get()} is called (for the same state 
partition) the returned
      * state will represent the updated list.
      *
-     * <p>If null is passed in, the state value will remain unchanged.
+     * <p>If null is passed in, the behaviour is undefined (implementation 
related).
+     * TODO: An unified behaviour across all sub-classes.
      *
      * @param value The new value for the state.
      * @throws Exception Thrown if the system cannot access the state.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 7508054f5c0..3ea5b16fb7f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -48,10 +48,13 @@ public interface ListState<T> extends MergingState<T, 
Iterable<T>> {
      * given list of values. The next time {@link #get()} is called (for the 
same state partition)
      * the returned state will represent the updated list.
      *
-     * <p>If null or an empty list is passed in, the state value will be null.
+     * <p>If an empty list is passed in, the state value will be null.
+     *
+     * <p>Null value passed in or any null value in list is not allowed.
      *
      * @param values The new values for the state.
-     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions).
+     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions, or
+     *     sanity check for null value).
      */
     void update(List<T> values) throws Exception;
 
@@ -60,10 +63,13 @@ public interface ListState<T> extends MergingState<T, 
Iterable<T>> {
      * existing list of values. The next time {@link #get()} is called (for 
the same state
      * partition) the returned state will represent the updated list.
      *
-     * <p>If null or an empty list is passed in, the state value remains 
unchanged.
+     * <p>If an empty list is passed in, the state value remains unchanged.
+     *
+     * <p>Null value passed in or any null value in list is not allowed.
      *
      * @param values The new values to be added to the state.
-     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions).
+     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions, or
+     *     sanity check for null value).
      */
     void addAll(List<T> values) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
index 7cb363b91c3..35d6c78e7ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
@@ -29,6 +29,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Implementation of operator list state.
  *
@@ -129,8 +131,12 @@ public final class PartitionableListState<S> implements 
ListState<S> {
 
     @Override
     public void addAll(List<S> values) {
-        if (values != null && !values.isEmpty()) {
-            internalList.addAll(values);
+        Preconditions.checkNotNull(values, "List of values to add cannot be 
null.");
+        if (!values.isEmpty()) {
+            for (S value : values) {
+                checkNotNull(value, "Any value to add to a list cannot be 
null.");
+                add(value);
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index bfb75a00bb2..48096efa24e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -39,10 +39,13 @@ public interface InternalListState<K, N, T>
      * given list of values. The next time {@link #get()} is called (for the 
same state partition)
      * the returned state will represent the updated list.
      *
-     * <p>If `null` or an empty list is passed in, the state value will be null
+     * <p>If an empty list is passed in, the state value will be null
+     *
+     * <p>Null value passed in or any null value in list is not allowed.
      *
      * @param values The new values for the state.
-     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions).
+     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions, or
+     *     sanity check for null value).
      */
     void update(List<T> values) throws Exception;
 
@@ -51,10 +54,13 @@ public interface InternalListState<K, N, T>
      * existing list of values. The next time {@link #get()} is called (for 
the same state
      * partition) the returned state will represent the updated list.
      *
-     * <p>If `null` or an empty list is passed in, the state value remains 
unchanged
+     * <p>If an empty list is passed in, the state value remains unchanged
+     *
+     * <p>Null value passed in or any null value in list is not allowed.
      *
      * @param values The new values to be added to the state.
-     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions).
+     * @throws Exception The method may forward exception thrown internally 
(by I/O or functions, or
+     *     sanity check for null value).
      */
     void addAll(List<T> values) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
index 2fecbc2c507..cdc17d68a2c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
@@ -47,6 +47,7 @@ class BatchExecutionKeyListState<K, N, T>
         checkNotNull(values);
         clear();
         for (T value : values) {
+            checkNotNull(value);
             add(value);
         }
     }
@@ -57,6 +58,7 @@ class BatchExecutionKeyListState<K, N, T>
             return;
         }
         for (T value : values) {
+            checkNotNull(value);
             add(value);
         }
     }

Reply via email to