This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e0240c78207 [FLINK-33060][state] Fix the javadoc of ListState
interfaces about not allowing null value
e0240c78207 is described below
commit e0240c782073f879a1a1e3fe22b45a38ee499c45
Author: Zakelly <[email protected]>
AuthorDate: Mon Nov 6 11:11:46 2023 +0800
[FLINK-33060][state] Fix the javadoc of ListState interfaces about not
allowing null value
---
.../org/apache/flink/api/common/state/AppendingState.java | 3 ++-
.../java/org/apache/flink/api/common/state/ListState.java | 14 ++++++++++----
.../apache/flink/runtime/state/PartitionableListState.java | 10 ++++++++--
.../flink/runtime/state/internal/InternalListState.java | 14 ++++++++++----
.../operators/sorted/state/BatchExecutionKeyListState.java | 2 ++
5 files changed, 32 insertions(+), 11 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index d020c0b7dcf..b081f86466c 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -58,7 +58,8 @@ public interface AppendingState<IN, OUT> extends State {
* of values. The next time {@link #get()} is called (for the same state
partition) the returned
* state will represent the updated list.
*
- * <p>If null is passed in, the state value will remain unchanged.
+ * <p>If null is passed in, the behaviour is undefined (implementation
related).
+ * TODO: An unified behaviour across all sub-classes.
*
* @param value The new value for the state.
* @throws Exception Thrown if the system cannot access the state.
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 7508054f5c0..3ea5b16fb7f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -48,10 +48,13 @@ public interface ListState<T> extends MergingState<T,
Iterable<T>> {
* given list of values. The next time {@link #get()} is called (for the
same state partition)
* the returned state will represent the updated list.
*
- * <p>If null or an empty list is passed in, the state value will be null.
+ * <p>If an empty list is passed in, the state value will be null.
+ *
+ * <p>Null value passed in or any null value in list is not allowed.
*
* @param values The new values for the state.
- * @throws Exception The method may forward exception thrown internally
(by I/O or functions).
+ * @throws Exception The method may forward exception thrown internally
(by I/O or functions, or
+ * sanity check for null value).
*/
void update(List<T> values) throws Exception;
@@ -60,10 +63,13 @@ public interface ListState<T> extends MergingState<T,
Iterable<T>> {
* existing list of values. The next time {@link #get()} is called (for
the same state
* partition) the returned state will represent the updated list.
*
- * <p>If null or an empty list is passed in, the state value remains
unchanged.
+ * <p>If an empty list is passed in, the state value remains unchanged.
+ *
+ * <p>Null value passed in or any null value in list is not allowed.
*
* @param values The new values to be added to the state.
- * @throws Exception The method may forward exception thrown internally
(by I/O or functions).
+ * @throws Exception The method may forward exception thrown internally
(by I/O or functions, or
+ * sanity check for null value).
*/
void addAll(List<T> values) throws Exception;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
index 7cb363b91c3..35d6c78e7ef 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java
@@ -29,6 +29,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Implementation of operator list state.
*
@@ -129,8 +131,12 @@ public final class PartitionableListState<S> implements
ListState<S> {
@Override
public void addAll(List<S> values) {
- if (values != null && !values.isEmpty()) {
- internalList.addAll(values);
+ Preconditions.checkNotNull(values, "List of values to add cannot be
null.");
+ if (!values.isEmpty()) {
+ for (S value : values) {
+ checkNotNull(value, "Any value to add to a list cannot be
null.");
+ add(value);
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index bfb75a00bb2..48096efa24e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -39,10 +39,13 @@ public interface InternalListState<K, N, T>
* given list of values. The next time {@link #get()} is called (for the
same state partition)
* the returned state will represent the updated list.
*
- * <p>If `null` or an empty list is passed in, the state value will be null
+ * <p>If an empty list is passed in, the state value will be null
+ *
+ * <p>Null value passed in or any null value in list is not allowed.
*
* @param values The new values for the state.
- * @throws Exception The method may forward exception thrown internally
(by I/O or functions).
+ * @throws Exception The method may forward exception thrown internally
(by I/O or functions, or
+ * sanity check for null value).
*/
void update(List<T> values) throws Exception;
@@ -51,10 +54,13 @@ public interface InternalListState<K, N, T>
* existing list of values. The next time {@link #get()} is called (for
the same state
* partition) the returned state will represent the updated list.
*
- * <p>If `null` or an empty list is passed in, the state value remains
unchanged
+ * <p>If an empty list is passed in, the state value remains unchanged
+ *
+ * <p>Null value passed in or any null value in list is not allowed.
*
* @param values The new values to be added to the state.
- * @throws Exception The method may forward exception thrown internally
(by I/O or functions).
+ * @throws Exception The method may forward exception thrown internally
(by I/O or functions, or
+ * sanity check for null value).
*/
void addAll(List<T> values) throws Exception;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
index 2fecbc2c507..cdc17d68a2c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyListState.java
@@ -47,6 +47,7 @@ class BatchExecutionKeyListState<K, N, T>
checkNotNull(values);
clear();
for (T value : values) {
+ checkNotNull(value);
add(value);
}
}
@@ -57,6 +58,7 @@ class BatchExecutionKeyListState<K, N, T>
return;
}
for (T value : values) {
+ checkNotNull(value);
add(value);
}
}