This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f7099e77e4eba45e2337e9afbaecb6eb7c62467
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Thu May 23 16:40:12 2024 +0800

    [FLINK-35434][statebackend] ForSt supports to pass db exception to runtime
---
 .../apache/flink/state/forst/ForStDBGetRequest.java |  4 ++++
 .../apache/flink/state/forst/ForStDBPutRequest.java |  4 ++++
 .../state/forst/ForStGeneralMultiGetOperation.java  | 21 ++++++++++++++++-----
 .../flink/state/forst/ForStStateExecutor.java       | 19 ++++++++++++++++++-
 .../flink/state/forst/ForStWriteBatchOperation.java |  8 +++++++-
 5 files changed, 49 insertions(+), 7 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
index 7bd8400192c..11868defb17 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java
@@ -59,6 +59,10 @@ public class ForStDBGetRequest<K, V> {
         future.complete(value);
     }
 
+    public void completeStateFutureExceptionally(String message, Throwable ex) 
{
+        future.completeExceptionally(message, ex);
+    }
+
     static <K, V> ForStDBGetRequest<K, V> of(
             K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) 
{
         return new ForStDBGetRequest<>(key, table, future);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
index 6e9821aeaa5..cc868b8ebb3 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java
@@ -68,6 +68,10 @@ public class ForStDBPutRequest<K, V> {
         future.complete(null);
     }
 
+    public void completeStateFutureExceptionally(String message, Throwable ex) 
{
+        future.completeExceptionally(message, ex);
+    }
+
     /**
      * If the value of the ForStDBPutRequest is null, then the request will 
signify the deletion of
      * the data associated with that key.
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
index ce74b574b7b..77885d1a55a 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
@@ -50,21 +51,31 @@ public class ForStGeneralMultiGetOperation implements 
ForStDBOperation {
 
     @Override
     public CompletableFuture<Void> process() {
+        // TODO: Use MultiGet to optimize this implement
 
         CompletableFuture<Void> future = new CompletableFuture<>();
 
+        AtomicReference<Exception> error = new AtomicReference<>();
         AtomicInteger counter = new AtomicInteger(batchRequest.size());
         for (int i = 0; i < batchRequest.size(); i++) {
             ForStDBGetRequest<?, ?> request = batchRequest.get(i);
             executor.execute(
                     () -> {
                         try {
-                            byte[] key = request.buildSerializedKey();
-                            byte[] value = 
db.get(request.getColumnFamilyHandle(), key);
-                            request.completeStateFuture(value);
+                            if (error.get() == null) {
+                                byte[] key = request.buildSerializedKey();
+                                byte[] value = 
db.get(request.getColumnFamilyHandle(), key);
+                                request.completeStateFuture(value);
+                            } else {
+                                request.completeStateFutureExceptionally(
+                                        "Error already occurred in other state 
request of the same "
+                                                + "group, failed the state 
request directly",
+                                        error.get());
+                            }
                         } catch (Exception e) {
-                            LOG.warn(
-                                    "Error when process general multiGet 
operation for forStDB", e);
+                            error.set(e);
+                            request.completeStateFutureExceptionally(
+                                    "Error when execute ForStDb get 
operation", e);
                             future.completeExceptionally(e);
                         } finally {
                             if (counter.decrementAndGet() == 0
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index 0f2972214b2..e96ee32010d 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -58,6 +58,8 @@ public class ForStStateExecutor implements StateExecutor {
 
     private final WriteOptions writeOptions;
 
+    private Throwable executionError;
+
     public ForStStateExecutor(int ioParallelism, RocksDB db, WriteOptions 
writeOptions) {
         this.coordinatorThread =
                 Executors.newSingleThreadScheduledExecutor(
@@ -72,6 +74,7 @@ public class ForStStateExecutor implements StateExecutor {
     @Override
     public CompletableFuture<Void> executeBatchRequests(
             StateRequestContainer stateRequestContainer) {
+        checkState();
         Preconditions.checkArgument(stateRequestContainer instanceof 
ForStStateRequestClassifier);
         ForStStateRequestClassifier stateRequestClassifier =
                 (ForStStateRequestClassifier) stateRequestContainer;
@@ -108,16 +111,30 @@ public class ForStStateExecutor implements StateExecutor {
                                                 duration);
                                         resultFuture.complete(null);
                                     },
-                                    coordinatorThread);
+                                    coordinatorThread)
+                            .exceptionally(
+                                    e -> {
+                                        executionError = e;
+                                        resultFuture.completeExceptionally(e);
+                                        return null;
+                                    });
                 });
         return resultFuture;
     }
 
     @Override
     public StateRequestContainer createStateRequestContainer() {
+        checkState();
         return new ForStStateRequestClassifier();
     }
 
+    private void checkState() {
+        if (executionError != null) {
+            throw new IllegalStateException(
+                    "previous state request already failed : ", 
executionError);
+        }
+    }
+
     @Override
     public void shutdown() {
         workerThreads.shutdown();
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
index 90e811c7ccb..3d4e59e3247 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java
@@ -75,7 +75,13 @@ public class ForStWriteBatchOperation implements 
ForStDBOperation {
                             request.completeStateFuture();
                         }
                     } catch (Exception e) {
-                        throw new CompletionException("Error while adding data 
to ForStDB", e);
+                        String msg = "Error while write batch data to 
ForStDB.";
+                        for (ForStDBPutRequest<?, ?> request : batchRequest) {
+                            // fail every state request in this batch
+                            request.completeStateFutureExceptionally(msg, e);
+                        }
+                        // fail the whole batch operation
+                        throw new CompletionException(msg, e);
                     }
                 },
                 executor);

Reply via email to