This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f7099e77e4eba45e2337e9afbaecb6eb7c62467 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Thu May 23 16:40:12 2024 +0800 [FLINK-35434][statebackend] ForSt supports to pass db exception to runtime --- .../apache/flink/state/forst/ForStDBGetRequest.java | 4 ++++ .../apache/flink/state/forst/ForStDBPutRequest.java | 4 ++++ .../state/forst/ForStGeneralMultiGetOperation.java | 21 ++++++++++++++++----- .../flink/state/forst/ForStStateExecutor.java | 19 ++++++++++++++++++- .../flink/state/forst/ForStWriteBatchOperation.java | 8 +++++++- 5 files changed, 49 insertions(+), 7 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java index 7bd8400192c..11868defb17 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBGetRequest.java @@ -59,6 +59,10 @@ public class ForStDBGetRequest<K, V> { future.complete(value); } + public void completeStateFutureExceptionally(String message, Throwable ex) { + future.completeExceptionally(message, ex); + } + static <K, V> ForStDBGetRequest<K, V> of( K key, ForStInnerTable<K, V> table, InternalStateFuture<V> future) { return new ForStDBGetRequest<>(key, table, future); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java index 6e9821aeaa5..cc868b8ebb3 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBPutRequest.java @@ -68,6 +68,10 @@ public class ForStDBPutRequest<K, V> { future.complete(null); } + public void completeStateFutureExceptionally(String message, Throwable ex) { + future.completeExceptionally(message, ex); + } + /** * If the value of the ForStDBPutRequest is null, then the request will signify the deletion of * the data associated with that key. diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java index ce74b574b7b..77885d1a55a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * The general-purpose multiGet operation implementation for ForStDB, which simulates multiGet by @@ -50,21 +51,31 @@ public class ForStGeneralMultiGetOperation implements ForStDBOperation { @Override public CompletableFuture<Void> process() { + // TODO: Use MultiGet to optimize this implement CompletableFuture<Void> future = new CompletableFuture<>(); + AtomicReference<Exception> error = new AtomicReference<>(); AtomicInteger counter = new AtomicInteger(batchRequest.size()); for (int i = 0; i < batchRequest.size(); i++) { ForStDBGetRequest<?, ?> request = batchRequest.get(i); executor.execute( () -> { try { - byte[] key = request.buildSerializedKey(); - byte[] value = db.get(request.getColumnFamilyHandle(), key); - request.completeStateFuture(value); + if (error.get() == null) { + byte[] key = request.buildSerializedKey(); + byte[] value = db.get(request.getColumnFamilyHandle(), key); + request.completeStateFuture(value); + } else { + request.completeStateFutureExceptionally( + "Error already occurred in other state request of the same " + + "group, failed the state request directly", + error.get()); + } } catch (Exception e) { - LOG.warn( - "Error when process general multiGet operation for forStDB", e); + error.set(e); + request.completeStateFutureExceptionally( + "Error when execute ForStDb get operation", e); future.completeExceptionally(e); } finally { if (counter.decrementAndGet() == 0 diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java index 0f2972214b2..e96ee32010d 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java @@ -58,6 +58,8 @@ public class ForStStateExecutor implements StateExecutor { private final WriteOptions writeOptions; + private Throwable executionError; + public ForStStateExecutor(int ioParallelism, RocksDB db, WriteOptions writeOptions) { this.coordinatorThread = Executors.newSingleThreadScheduledExecutor( @@ -72,6 +74,7 @@ public class ForStStateExecutor implements StateExecutor { @Override public CompletableFuture<Void> executeBatchRequests( StateRequestContainer stateRequestContainer) { + checkState(); Preconditions.checkArgument(stateRequestContainer instanceof ForStStateRequestClassifier); ForStStateRequestClassifier stateRequestClassifier = (ForStStateRequestClassifier) stateRequestContainer; @@ -108,16 +111,30 @@ public class ForStStateExecutor implements StateExecutor { duration); resultFuture.complete(null); }, - coordinatorThread); + coordinatorThread) + .exceptionally( + e -> { + executionError = e; + resultFuture.completeExceptionally(e); + return null; + }); }); return resultFuture; } @Override public StateRequestContainer createStateRequestContainer() { + checkState(); return new ForStStateRequestClassifier(); } + private void checkState() { + if (executionError != null) { + throw new IllegalStateException( + "previous state request already failed : ", executionError); + } + } + @Override public void shutdown() { workerThreads.shutdown(); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java index 90e811c7ccb..3d4e59e3247 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java @@ -75,7 +75,13 @@ public class ForStWriteBatchOperation implements ForStDBOperation { request.completeStateFuture(); } } catch (Exception e) { - throw new CompletionException("Error while adding data to ForStDB", e); + String msg = "Error while write batch data to ForStDB."; + for (ForStDBPutRequest<?, ?> request : batchRequest) { + // fail every state request in this batch + request.completeStateFutureExceptionally(msg, e); + } + // fail the whole batch operation + throw new CompletionException(msg, e); } }, executor);