This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 241e819d82b [FLINK-26852][state] Avoid RocksDBMapState#clear 
swallowing related exception
241e819d82b is described below

commit 241e819d82bf84535b355e3822894d6b685db0a5
Author: Hangxiang Yu <[email protected]>
AuthorDate: Wed Mar 30 15:08:17 2022 +0800

    [FLINK-26852][state] Avoid RocksDBMapState#clear swallowing related 
exception
---
 .../contrib/streaming/state/RocksDBMapState.java   | 44 +++++++++++-----------
 .../state/EmbeddedRocksDBStateBackendTest.java     | 23 +++++++++++
 2 files changed, 44 insertions(+), 23 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index da68662b869..0bcebb53727 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -278,31 +278,29 @@ class RocksDBMapState<K, N, UK, UV> extends 
AbstractRocksDBState<K, N, Map<UK, U
 
     @Override
     public void clear() {
-        try {
-            try (RocksIteratorWrapper iterator =
-                            RocksDBOperationUtils.getRocksIterator(
-                                    backend.db, columnFamily, 
backend.getReadOptions());
-                    RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper =
-                            new RocksDBWriteBatchWrapper(
-                                    backend.db,
-                                    backend.getWriteOptions(),
-                                    backend.getWriteBatchSize())) {
-
-                final byte[] keyPrefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
-                iterator.seek(keyPrefixBytes);
-
-                while (iterator.isValid()) {
-                    byte[] keyBytes = iterator.key();
-                    if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
-                        rocksDBWriteBatchWrapper.remove(columnFamily, 
keyBytes);
-                    } else {
-                        break;
-                    }
-                    iterator.next();
+        try (RocksIteratorWrapper iterator =
+                        RocksDBOperationUtils.getRocksIterator(
+                                backend.db, columnFamily, 
backend.getReadOptions());
+                RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper =
+                        new RocksDBWriteBatchWrapper(
+                                backend.db,
+                                backend.getWriteOptions(),
+                                backend.getWriteBatchSize())) {
+
+            final byte[] keyPrefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
+            iterator.seek(keyPrefixBytes);
+
+            while (iterator.isValid()) {
+                byte[] keyBytes = iterator.key();
+                if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) {
+                    rocksDBWriteBatchWrapper.remove(columnFamily, keyBytes);
+                } else {
+                    break;
                 }
+                iterator.next();
             }
-        } catch (Exception e) {
-            LOG.warn("Error while cleaning the state.", e);
+        } catch (RocksDBException e) {
+            throw new FlinkRuntimeException("Error while cleaning the state in 
RocksDB.", e);
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index ca591dd20cf..3ce02e16678 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -45,6 +47,7 @@ import 
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -64,6 +67,7 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.RocksObject;
 import org.rocksdb.Snapshot;
@@ -619,6 +623,25 @@ public class EmbeddedRocksDBStateBackendTest
         }
     }
 
+    @Test(expected = FlinkRuntimeException.class)
+    public void testMapStateClear() throws Exception {
+        setupRocksKeyedStateBackend();
+        MapStateDescriptor<Integer, String> kvId =
+                new MapStateDescriptor<>("id", Integer.class, String.class);
+        MapState<Integer, String> state =
+                keyedStateBackend.getPartitionedState(
+                        VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+        doAnswer(
+                        invocationOnMock -> {
+                            throw new RocksDBException("Artificial failure");
+                        })
+                .when(keyedStateBackend.db)
+                .newIterator(any(ColumnFamilyHandle.class), 
any(ReadOptions.class));
+
+        state.clear();
+    }
+
     private void runStateUpdates() throws Exception {
         for (int i = 50; i < 150; ++i) {
             if (i % 10 == 0) {

Reply via email to