aljoscha closed pull request #5749: [FLINK-9058] Relax ListState.addAll() and
ListState.update() to take Iterable
URL: https://github.com/apache/flink/pull/5749
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index b226ff1360a..5c12793ce2c 100644
---
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -717,18 +717,20 @@ boolean isClearCalled() {
}
@Override
- public void update(List<T> values) throws Exception {
+ public void update(Iterable<T> values) throws Exception {
clear();
addAll(values);
}
@Override
- public void addAll(List<T> values) throws Exception {
+ public void addAll(Iterable<T> values) throws Exception {
if (values != null) {
values.forEach(v ->
Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
- list.addAll(values);
+ for (T v : values) {
+ list.add(v);
+ }
}
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 254dc1d6140..38b556d64c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -20,8 +20,6 @@
import org.apache.flink.annotation.PublicEvolving;
-import java.util.List;
-
/**
* {@link State} interface for partitioned list state in Operations.
* The state is accessed and modified by user functions, and checkpointed
consistently
@@ -48,7 +46,7 @@
*
* @throws Exception The method may forward exception thrown internally
(by I/O or functions).
*/
- void update(List<T> values) throws Exception;
+ void update(Iterable<T> values) throws Exception;
/**
* Updates the operator state accessible by {@link #get()} by adding
the given values
@@ -61,5 +59,5 @@
*
* @throws Exception The method may forward exception thrown internally
(by I/O or functions).
*/
- void addAll(List<T> values) throws Exception;
+ void addAll(Iterable<T> values) throws Exception;
}
diff --git
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
index 9f1465eafa3..2c10befbd5c 100644
---
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
+++
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -69,12 +69,12 @@ public void clear() {
}
@Override
- public void update(List<V> values) throws Exception {
+ public void update(Iterable<V> values) throws Exception {
throw MODIFICATION_ATTEMPT_ERROR;
}
@Override
- public void addAll(List<V> values) throws Exception {
+ public void addAll(Iterable<V> values) throws Exception {
throw MODIFICATION_ATTEMPT_ERROR;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 01a397ae6c1..625937dcaf0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -691,17 +691,19 @@ public String toString() {
}
@Override
- public void update(List<S> values) throws Exception {
+ public void update(Iterable<S> values) {
internalList.clear();
addAll(values);
}
@Override
- public void addAll(List<S> values) throws Exception {
- if (values != null && !values.isEmpty()) {
- internalList.addAll(values);
+ public void addAll(Iterable<S> values) {
+ for (S v : values) {
+ Preconditions.checkNotNull(v, "You cannot add
null to a ListState.");
+ internalList.add(v);
}
+
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
index 71f5aa5a912..60c0549831d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
@@ -21,7 +21,6 @@
import org.apache.flink.api.common.state.ListState;
import java.util.Collections;
-import java.util.List;
/**
* Simple wrapper list state that exposes empty state properly as an empty
list.
@@ -57,12 +56,12 @@ public void clear() {
}
@Override
- public void update(List<T> values) throws Exception {
+ public void update(Iterable<T> values) throws Exception {
originalState.update(values);
}
@Override
- public void addAll(List<T> values) throws Exception {
+ public void addAll(Iterable<T> values) throws Exception {
originalState.addAll(values);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f7b5cd2d5f0..d2df8199308 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -120,38 +120,35 @@ public void add(V value) {
}
@Override
- public void update(List<V> values) throws Exception {
+ public void update(Iterable<V> values) {
Preconditions.checkNotNull(values, "List of values to add
cannot be null.");
- if (values.isEmpty()) {
- clear();
- return;
- }
-
List<V> newStateList = new ArrayList<>();
for (V v : values) {
Preconditions.checkNotNull(v, "You cannot add null to a
ListState.");
newStateList.add(v);
}
- stateTable.put(currentNamespace, newStateList);
+ if (newStateList.isEmpty()) {
+ clear();
+ } else {
+ stateTable.put(currentNamespace, newStateList);
+ }
}
@Override
- public void addAll(List<V> values) throws Exception {
+ public void addAll(Iterable<V> values) throws Exception {
Preconditions.checkNotNull(values, "List of values to add
cannot be null.");
- if (!values.isEmpty()) {
- stateTable.transform(currentNamespace, values,
(previousState, value) -> {
- if (previousState == null) {
- previousState = new ArrayList<>();
- }
- for (V v : value) {
- Preconditions.checkNotNull(v, "You
cannot add null to a ListState.");
- previousState.add(v);
- }
- return previousState;
- });
- }
+ stateTable.transform(currentNamespace, values, (previousState,
value) -> {
+ if (previousState == null) {
+ previousState = new ArrayList<>();
+ }
+ for (V v : value) {
+ Preconditions.checkNotNull(v, "You cannot add
null to a ListState.");
+ previousState.add(v);
+ }
+ return previousState;
+ });
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
index 1e22dc6eea6..52438467460 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -20,8 +20,6 @@
import org.apache.flink.api.common.state.ListState;
-import java.util.List;
-
/**
* The peer to the {@link ListState} in the internal state type hierarchy.
*
@@ -42,7 +40,7 @@
*
* @throws Exception The method may forward exception thrown internally
(by I/O or functions).
*/
- void update(List<T> values) throws Exception;
+ void update(Iterable<T> values) throws Exception;
/**
* Updates the operator state accessible by {@link #get()} by adding
the given values
@@ -55,5 +53,5 @@
*
* @throws Exception The method may forward exception thrown internally
(by I/O or functions).
*/
- void addAll(List<T> values) throws Exception;
+ void addAll(Iterable<T> values) throws Exception;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 11ae389da73..1ef91e25d87 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1353,7 +1353,7 @@ public void testListStateAddNull() throws Exception {
/**
* This test verifies that all ListState implementations are consistent
in not allowing
- * {@link ListState#addAll(List)} to be called with {@code null}
entries in the list of entries
+ * {@link ListState#addAll(Iterable)} to be called with {@code null}
entries in the list of entries
* to add.
*/
@Test
@@ -1387,7 +1387,7 @@ public void testListStateAddAllNullEntries() throws
Exception {
/**
* This test verifies that all ListState implementations are consistent
in not allowing
- * {@link ListState#addAll(List)} to be called with {@code null}.
+ * {@link ListState#addAll(Iterable)} to be called with {@code null}.
*/
@Test
public void testListStateAddAllNull() throws Exception {
@@ -1415,7 +1415,7 @@ public void testListStateAddAllNull() throws Exception {
/**
* This test verifies that all ListState implementations are consistent
in not allowing
- * {@link ListState#addAll(List)} to be called with {@code null}
entries in the list of entries
+ * {@link ListState#addAll(Iterable)} to be called with {@code null}
entries in the list of entries
* to add.
*/
@Test
@@ -1449,7 +1449,7 @@ public void testListStateUpdateNullEntries() throws
Exception {
/**
* This test verifies that all ListState implementations are consistent
in not allowing
- * {@link ListState#addAll(List)} to be called with {@code null}.
+ * {@link ListState#addAll(Iterable)} to be called with {@code null}.
*/
@Test
public void testListStateUpdateNull() throws Exception {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 62c169b059e..9b446c09116 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -157,50 +157,46 @@ public void mergeNamespaces(N target, Collection<N>
sources) throws Exception {
}
@Override
- public void update(List<V> values) throws Exception {
+ public void update(Iterable<V> values) throws Exception {
Preconditions.checkNotNull(values, "List of values to add
cannot be null.");
clear();
- if (!values.isEmpty()) {
- try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key =
keySerializationStream.toByteArray();
+ try {
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
- byte[] premerge = getPreMergedValue(values);
- if (premerge != null) {
- backend.db.put(columnFamily,
writeOptions, key, premerge);
- } else {
- throw new IOException("Failed pre-merge
values in update()");
- }
- } catch (IOException | RocksDBException e) {
- throw new RuntimeException("Error while
updating data to RocksDB", e);
+ byte[] premerge = getPreMergedValue(values);
+ if (premerge != null) {
+ backend.db.put(columnFamily, writeOptions, key,
premerge);
+ } else {
+ throw new IOException("Failed pre-merge values
in update()");
}
+ } catch (IOException | RocksDBException e) {
+ throw new RuntimeException("Error while updating data
to RocksDB", e);
}
}
@Override
- public void addAll(List<V> values) throws Exception {
+ public void addAll(Iterable<V> values) throws Exception {
Preconditions.checkNotNull(values, "List of values to add
cannot be null.");
- if (!values.isEmpty()) {
- try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key =
keySerializationStream.toByteArray();
+ try {
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key = keySerializationStream.toByteArray();
- byte[] premerge = getPreMergedValue(values);
- if (premerge != null) {
- backend.db.merge(columnFamily,
writeOptions, key, premerge);
- } else {
- throw new IOException("Failed pre-merge
values in addAll()");
- }
- } catch (IOException | RocksDBException e) {
- throw new RuntimeException("Error while
updating data to RocksDB", e);
+ byte[] premerge = getPreMergedValue(values);
+ if (premerge != null) {
+ backend.db.merge(columnFamily, writeOptions,
key, premerge);
+ } else {
+ throw new IOException("Failed pre-merge values
in addAll()");
}
+ } catch (IOException | RocksDBException e) {
+ throw new RuntimeException("Error while updating data
to RocksDB", e);
}
}
- private byte[] getPreMergedValue(List<V> values) throws IOException {
+ private byte[] getPreMergedValue(Iterable<V> values) throws IOException
{
DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(keySerializationStream);
keySerializationStream.reset();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services