This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f33abffe1da [FLINK-34975][state/forst] Support multiget for forstdb 
(#25363)
f33abffe1da is described below

commit f33abffe1daccd2f3d14b110031cf684ff394abf
Author: Yanfei Lei <[email protected]>
AuthorDate: Wed Sep 25 20:41:30 2024 +0800

    [FLINK-34975][state/forst] Support multiget for forstdb (#25363)
---
 .../state/forst/ForStGeneralMultiGetOperation.java | 146 ++++++++++++++++++++-
 .../flink/state/forst/ForStStateExecutor.java      |   6 +-
 .../forst/ForStGeneralMultiGetOperationTest.java   |   4 +-
 3 files changed, 146 insertions(+), 10 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
index 4f4a445dbee..6dc5904c1b5 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.state.forst;
 
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -33,39 +37,167 @@ import java.util.concurrent.atomic.AtomicReference;
 public class ForStGeneralMultiGetOperation implements ForStDBOperation {
 
     private final RocksDB db;
-
     private final List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest;
 
+    List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests;
+    List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests;
+
     private final Executor executor;
 
     private final Runnable subProcessFinished;
 
+    private final int readIoParallelism;
+
     ForStGeneralMultiGetOperation(
             RocksDB db, List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest, 
Executor executor) {
-        this(db, batchRequest, executor, null);
+        this(db, batchRequest, executor, 1, null);
     }
 
     ForStGeneralMultiGetOperation(
             RocksDB db,
             List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest,
             Executor executor,
+            int readIoParallelism,
             Runnable subProcessFinished) {
         this.db = db;
         this.batchRequest = batchRequest;
         this.executor = executor;
         this.subProcessFinished = subProcessFinished;
+        this.readIoParallelism = readIoParallelism;
+        this.splitRequests = new ArrayList<>();
+        this.mapCheckRequests = new ArrayList<>();
+        classifyAndSplitRequests(splitRequests, mapCheckRequests);
     }
 
     @Override
     public CompletableFuture<Void> process() {
-        // TODO: Use MultiGet to optimize this implement
 
         CompletableFuture<Void> future = new CompletableFuture<>();
-
         AtomicReference<Exception> error = new AtomicReference<>();
         AtomicInteger counter = new AtomicInteger(batchRequest.size());
+
+        processOneByOne(mapCheckRequests, error, counter, future);
+        for (List<ForStDBGetRequest<?, ?, ?, ?>> getRequests : splitRequests) {
+            executor.execute(
+                    () -> {
+                        try {
+                            ReadOptions readOptions = new ReadOptions();
+                            readOptions.setReadaheadSize(0);
+                            List<byte[]> keys = new 
ArrayList<>(getRequests.size());
+                            List<ColumnFamilyHandle> columnFamilyHandles =
+                                    new ArrayList<>(getRequests.size());
+
+                            for (int i = 0; i < getRequests.size(); i++) {
+                                ForStDBGetRequest<?, ?, ?, ?> request = 
getRequests.get(i);
+                                try {
+                                    if (error.get() == null) {
+                                        byte[] key = 
request.buildSerializedKey();
+                                        keys.add(key);
+                                        
columnFamilyHandles.add(request.getColumnFamilyHandle());
+                                    } else {
+                                        completeExceptionallyRequest(
+                                                request,
+                                                "Error already occurred in 
other state request of the same group, failed the state request directly",
+                                                error.get());
+                                    }
+                                } catch (IOException e) {
+                                    error.set(e);
+                                    completeExceptionallyRequest(
+                                            request,
+                                            "Error when execute ForStDb 
serialized get key",
+                                            e);
+                                    future.completeExceptionally(e);
+                                }
+                            }
+                            if (error.get() != null) {
+                                return;
+                            }
+                            List<byte[]> values = null;
+                            try {
+                                values = db.multiGetAsList(readOptions, 
columnFamilyHandles, keys);
+                            } catch (Exception e) {
+                                error.set(e);
+                                future.completeExceptionally(e);
+                                for (int i = 0; i < getRequests.size(); i++) {
+                                    completeExceptionallyRequest(
+                                            getRequests.get(i), "Error 
occurred when multiGet", e);
+                                }
+                            }
+                            if (error.get() != null) {
+                                return;
+                            }
+                            for (int i = 0; i < getRequests.size(); i++) {
+                                ForStDBGetRequest<?, ?, ?, ?> request = 
getRequests.get(i);
+                                try {
+                                    if (error.get() == null) {
+                                        
request.completeStateFuture(values.get(i));
+                                    } else {
+                                        completeExceptionallyRequest(
+                                                request,
+                                                "Error already occurred in 
other state request of the same "
+                                                        + "group, failed the 
state request directly",
+                                                error.get());
+                                    }
+                                } catch (Exception e) {
+                                    error.set(e);
+                                    completeExceptionallyRequest(
+                                            request, "Error when complete get 
future.", e);
+                                    future.completeExceptionally(e);
+                                }
+                            }
+
+                            if (counter.addAndGet(-getRequests.size()) == 0
+                                    && !future.isCompletedExceptionally()) {
+                                future.complete(null);
+                            }
+
+                        } finally {
+                            if (subProcessFinished != null) {
+                                subProcessFinished.run();
+                            }
+                        }
+                    });
+        }
+        return future;
+    }
+
+    private void completeExceptionallyRequest(
+            ForStDBGetRequest<?, ?, ?, ?> request, String message, Exception 
e) {
+        request.completeStateFutureExceptionally(message, e);
+    }
+
+    private void classifyAndSplitRequests(
+            List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests,
+            List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests) {
+        List<ForStDBGetRequest<?, ?, ?, ?>> getRequests = new ArrayList<>();
         for (int i = 0; i < batchRequest.size(); i++) {
             ForStDBGetRequest<?, ?, ?, ?> request = batchRequest.get(i);
+            if (request instanceof ForStDBMapCheckRequest) {
+                mapCheckRequests.add(request);
+            } else {
+                getRequests.add(request);
+            }
+        }
+
+        for (int p = 0; p < readIoParallelism; p++) {
+            int startIndex = getRequests.size() * p / readIoParallelism;
+            int endIndex = getRequests.size() * (p + 1) / readIoParallelism;
+            if (startIndex < endIndex) {
+                splitRequests.add(new ArrayList<>());
+            }
+            for (int i = startIndex; i < endIndex; i++) {
+                splitRequests.get(splitRequests.size() - 
1).add(getRequests.get(i));
+            }
+        }
+    }
+
+    private void processOneByOne(
+            List<ForStDBGetRequest<?, ?, ?, ?>> requests,
+            AtomicReference<Exception> error,
+            AtomicInteger counter,
+            CompletableFuture<Void> future) {
+        for (int i = 0; i < requests.size(); i++) {
+            ForStDBGetRequest<?, ?, ?, ?> request = requests.get(i);
             executor.execute(
                     () -> {
                         try {
@@ -84,7 +216,8 @@ public class ForStGeneralMultiGetOperation implements 
ForStDBOperation {
                             future.completeExceptionally(e);
                         } finally {
                             if (counter.decrementAndGet() == 0
-                                    && !future.isCompletedExceptionally()) {
+                                    && !future.isCompletedExceptionally()
+                                    && !future.isDone()) {
                                 future.complete(null);
                             }
                             if (subProcessFinished != null) {
@@ -93,11 +226,10 @@ public class ForStGeneralMultiGetOperation implements 
ForStDBOperation {
                         }
                     });
         }
-        return future;
     }
 
     @Override
     public int subProcessCount() {
-        return batchRequest.size();
+        return mapCheckRequests.size() + splitRequests.size();
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index 6b2ecf0e747..30b6fa4e513 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -151,7 +151,11 @@ public class ForStStateExecutor implements StateExecutor {
                     if (!getRequests.isEmpty()) {
                         ForStGeneralMultiGetOperation getOperations =
                                 new ForStGeneralMultiGetOperation(
-                                        db, getRequests, readThreads, 
ongoing::decrementAndGet);
+                                        db,
+                                        getRequests,
+                                        readThreads,
+                                        readThreadCount,
+                                        ongoing::decrementAndGet);
                         // sub process count should -1, since we have added 1 
on top.
                         ongoing.addAndGet(getOperations.subProcessCount() - 1);
                         futures.add(getOperations.process());
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
index 9847c40d19f..2585691f43e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
@@ -63,9 +63,9 @@ public class ForStGeneralMultiGetOperationTest extends 
ForStDBOperationTestBase
             db.put(request.getColumnFamilyHandle(), keyBytes, valueBytes);
         }
 
-        ExecutorService executor = Executors.newFixedThreadPool(4);
+        ExecutorService executor = Executors.newFixedThreadPool(3);
         ForStGeneralMultiGetOperation generalMultiGetOperation =
-                new ForStGeneralMultiGetOperation(db, batchGetRequest, 
executor);
+                new ForStGeneralMultiGetOperation(db, batchGetRequest, 
executor, 3, null);
         generalMultiGetOperation.process().get();
 
         for (Tuple2<String, TestStateFuture<String>> tuple : resultCheckList) {

Reply via email to