This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f33abffe1da [FLINK-34975][state/forst] Support multiget for forstdb
(#25363)
f33abffe1da is described below
commit f33abffe1daccd2f3d14b110031cf684ff394abf
Author: Yanfei Lei <[email protected]>
AuthorDate: Wed Sep 25 20:41:30 2024 +0800
[FLINK-34975][state/forst] Support multiget for forstdb (#25363)
---
.../state/forst/ForStGeneralMultiGetOperation.java | 146 ++++++++++++++++++++-
.../flink/state/forst/ForStStateExecutor.java | 6 +-
.../forst/ForStGeneralMultiGetOperationTest.java | 4 +-
3 files changed, 146 insertions(+), 10 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
index 4f4a445dbee..6dc5904c1b5 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java
@@ -18,8 +18,12 @@
package org.apache.flink.state.forst;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -33,39 +37,167 @@ import java.util.concurrent.atomic.AtomicReference;
public class ForStGeneralMultiGetOperation implements ForStDBOperation {
private final RocksDB db;
-
private final List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest;
+ List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests;
+ List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests;
+
private final Executor executor;
private final Runnable subProcessFinished;
+ private final int readIoParallelism;
+
ForStGeneralMultiGetOperation(
RocksDB db, List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest,
Executor executor) {
- this(db, batchRequest, executor, null);
+ this(db, batchRequest, executor, 1, null);
}
ForStGeneralMultiGetOperation(
RocksDB db,
List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest,
Executor executor,
+ int readIoParallelism,
Runnable subProcessFinished) {
this.db = db;
this.batchRequest = batchRequest;
this.executor = executor;
this.subProcessFinished = subProcessFinished;
+ this.readIoParallelism = readIoParallelism;
+ this.splitRequests = new ArrayList<>();
+ this.mapCheckRequests = new ArrayList<>();
+ classifyAndSplitRequests(splitRequests, mapCheckRequests);
}
@Override
public CompletableFuture<Void> process() {
- // TODO: Use MultiGet to optimize this implement
CompletableFuture<Void> future = new CompletableFuture<>();
-
AtomicReference<Exception> error = new AtomicReference<>();
AtomicInteger counter = new AtomicInteger(batchRequest.size());
+
+ processOneByOne(mapCheckRequests, error, counter, future);
+ for (List<ForStDBGetRequest<?, ?, ?, ?>> getRequests : splitRequests) {
+ executor.execute(
+ () -> {
+ try {
+ ReadOptions readOptions = new ReadOptions();
+ readOptions.setReadaheadSize(0);
+ List<byte[]> keys = new
ArrayList<>(getRequests.size());
+ List<ColumnFamilyHandle> columnFamilyHandles =
+ new ArrayList<>(getRequests.size());
+
+ for (int i = 0; i < getRequests.size(); i++) {
+ ForStDBGetRequest<?, ?, ?, ?> request =
getRequests.get(i);
+ try {
+ if (error.get() == null) {
+ byte[] key =
request.buildSerializedKey();
+ keys.add(key);
+
columnFamilyHandles.add(request.getColumnFamilyHandle());
+ } else {
+ completeExceptionallyRequest(
+ request,
+ "Error already occurred in
other state request of the same group, failed the state request directly",
+ error.get());
+ }
+ } catch (IOException e) {
+ error.set(e);
+ completeExceptionallyRequest(
+ request,
+ "Error when execute ForStDb
serialized get key",
+ e);
+ future.completeExceptionally(e);
+ }
+ }
+ if (error.get() != null) {
+ return;
+ }
+ List<byte[]> values = null;
+ try {
+ values = db.multiGetAsList(readOptions,
columnFamilyHandles, keys);
+ } catch (Exception e) {
+ error.set(e);
+ future.completeExceptionally(e);
+ for (int i = 0; i < getRequests.size(); i++) {
+ completeExceptionallyRequest(
+ getRequests.get(i), "Error
occurred when multiGet", e);
+ }
+ }
+ if (error.get() != null) {
+ return;
+ }
+ for (int i = 0; i < getRequests.size(); i++) {
+ ForStDBGetRequest<?, ?, ?, ?> request =
getRequests.get(i);
+ try {
+ if (error.get() == null) {
+
request.completeStateFuture(values.get(i));
+ } else {
+ completeExceptionallyRequest(
+ request,
+ "Error already occurred in
other state request of the same "
+ + "group, failed the
state request directly",
+ error.get());
+ }
+ } catch (Exception e) {
+ error.set(e);
+ completeExceptionallyRequest(
+ request, "Error when complete get
future.", e);
+ future.completeExceptionally(e);
+ }
+ }
+
+ if (counter.addAndGet(-getRequests.size()) == 0
+ && !future.isCompletedExceptionally()) {
+ future.complete(null);
+ }
+
+ } finally {
+ if (subProcessFinished != null) {
+ subProcessFinished.run();
+ }
+ }
+ });
+ }
+ return future;
+ }
+
+ private void completeExceptionallyRequest(
+ ForStDBGetRequest<?, ?, ?, ?> request, String message, Exception
e) {
+ request.completeStateFutureExceptionally(message, e);
+ }
+
+ private void classifyAndSplitRequests(
+ List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests,
+ List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests) {
+ List<ForStDBGetRequest<?, ?, ?, ?>> getRequests = new ArrayList<>();
for (int i = 0; i < batchRequest.size(); i++) {
ForStDBGetRequest<?, ?, ?, ?> request = batchRequest.get(i);
+ if (request instanceof ForStDBMapCheckRequest) {
+ mapCheckRequests.add(request);
+ } else {
+ getRequests.add(request);
+ }
+ }
+
+ for (int p = 0; p < readIoParallelism; p++) {
+ int startIndex = getRequests.size() * p / readIoParallelism;
+ int endIndex = getRequests.size() * (p + 1) / readIoParallelism;
+ if (startIndex < endIndex) {
+ splitRequests.add(new ArrayList<>());
+ }
+ for (int i = startIndex; i < endIndex; i++) {
+ splitRequests.get(splitRequests.size() -
1).add(getRequests.get(i));
+ }
+ }
+ }
+
+ private void processOneByOne(
+ List<ForStDBGetRequest<?, ?, ?, ?>> requests,
+ AtomicReference<Exception> error,
+ AtomicInteger counter,
+ CompletableFuture<Void> future) {
+ for (int i = 0; i < requests.size(); i++) {
+ ForStDBGetRequest<?, ?, ?, ?> request = requests.get(i);
executor.execute(
() -> {
try {
@@ -84,7 +216,8 @@ public class ForStGeneralMultiGetOperation implements
ForStDBOperation {
future.completeExceptionally(e);
} finally {
if (counter.decrementAndGet() == 0
- && !future.isCompletedExceptionally()) {
+ && !future.isCompletedExceptionally()
+ && !future.isDone()) {
future.complete(null);
}
if (subProcessFinished != null) {
@@ -93,11 +226,10 @@ public class ForStGeneralMultiGetOperation implements
ForStDBOperation {
}
});
}
- return future;
}
@Override
public int subProcessCount() {
- return batchRequest.size();
+ return mapCheckRequests.size() + splitRequests.size();
}
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index 6b2ecf0e747..30b6fa4e513 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -151,7 +151,11 @@ public class ForStStateExecutor implements StateExecutor {
if (!getRequests.isEmpty()) {
ForStGeneralMultiGetOperation getOperations =
new ForStGeneralMultiGetOperation(
- db, getRequests, readThreads,
ongoing::decrementAndGet);
+ db,
+ getRequests,
+ readThreads,
+ readThreadCount,
+ ongoing::decrementAndGet);
// sub process count should -1, since we have added 1
on top.
ongoing.addAndGet(getOperations.subProcessCount() - 1);
futures.add(getOperations.process());
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
index 9847c40d19f..2585691f43e 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperationTest.java
@@ -63,9 +63,9 @@ public class ForStGeneralMultiGetOperationTest extends
ForStDBOperationTestBase
db.put(request.getColumnFamilyHandle(), keyBytes, valueBytes);
}
- ExecutorService executor = Executors.newFixedThreadPool(4);
+ ExecutorService executor = Executors.newFixedThreadPool(3);
ForStGeneralMultiGetOperation generalMultiGetOperation =
- new ForStGeneralMultiGetOperation(db, batchGetRequest,
executor);
+ new ForStGeneralMultiGetOperation(db, batchGetRequest,
executor, 3, null);
generalMultiGetOperation.process().get();
for (Tuple2<String, TestStateFuture<String>> tuple : resultCheckList) {