This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 241e819d82b [FLINK-26852][state] Avoid RocksDBMapState#clear
swallowing related exception
241e819d82b is described below
commit 241e819d82bf84535b355e3822894d6b685db0a5
Author: Hangxiang Yu <[email protected]>
AuthorDate: Wed Mar 30 15:08:17 2022 +0800
[FLINK-26852][state] Avoid RocksDBMapState#clear swallowing related
exception
---
.../contrib/streaming/state/RocksDBMapState.java | 44 +++++++++++-----------
.../state/EmbeddedRocksDBStateBackendTest.java | 23 +++++++++++
2 files changed, 44 insertions(+), 23 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index da68662b869..0bcebb53727 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -278,31 +278,29 @@ class RocksDBMapState<K, N, UK, UV> extends
AbstractRocksDBState<K, N, Map<UK, U
@Override
public void clear() {
- try {
- try (RocksIteratorWrapper iterator =
- RocksDBOperationUtils.getRocksIterator(
- backend.db, columnFamily,
backend.getReadOptions());
- RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper =
- new RocksDBWriteBatchWrapper(
- backend.db,
- backend.getWriteOptions(),
- backend.getWriteBatchSize())) {
-
- final byte[] keyPrefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
- iterator.seek(keyPrefixBytes);
-
- while (iterator.isValid()) {
- byte[] keyBytes = iterator.key();
- if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
- rocksDBWriteBatchWrapper.remove(columnFamily,
keyBytes);
- } else {
- break;
- }
- iterator.next();
+ try (RocksIteratorWrapper iterator =
+ RocksDBOperationUtils.getRocksIterator(
+ backend.db, columnFamily,
backend.getReadOptions());
+ RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper =
+ new RocksDBWriteBatchWrapper(
+ backend.db,
+ backend.getWriteOptions(),
+ backend.getWriteBatchSize())) {
+
+ final byte[] keyPrefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
+ iterator.seek(keyPrefixBytes);
+
+ while (iterator.isValid()) {
+ byte[] keyBytes = iterator.key();
+ if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
+ rocksDBWriteBatchWrapper.remove(columnFamily, keyBytes);
+ } else {
+ break;
}
+ iterator.next();
}
- } catch (Exception e) {
- LOG.warn("Error while cleaning the state.", e);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while cleaning the state in
RocksDB.", e);
}
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index ca591dd20cf..3ce02e16678 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -45,6 +47,7 @@ import
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;
@@ -64,6 +67,7 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksObject;
import org.rocksdb.Snapshot;
@@ -619,6 +623,25 @@ public class EmbeddedRocksDBStateBackendTest
}
}
+ @Test(expected = FlinkRuntimeException.class)
+ public void testMapStateClear() throws Exception {
+ setupRocksKeyedStateBackend();
+ MapStateDescriptor<Integer, String> kvId =
+ new MapStateDescriptor<>("id", Integer.class, String.class);
+ MapState<Integer, String> state =
+ keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ doAnswer(
+ invocationOnMock -> {
+ throw new RocksDBException("Artificial failure");
+ })
+ .when(keyedStateBackend.db)
+ .newIterator(any(ColumnFamilyHandle.class),
any(ReadOptions.class));
+
+ state.clear();
+ }
+
private void runStateUpdates() throws Exception {
for (int i = 50; i < 150; ++i) {
if (i % 10 == 0) {