ljz2051 commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1590951299
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##########
@@ -31,75 +30,50 @@
/**
* The general-purpose multiGet operation implementation for ForStDB, which
simulates multiGet by
* calling the Get API multiple times with multiple threads.
- *
- * @param <K> The type of key in get access request.
- * @param <V> The type of value in get access request.
*/
-public class ForStGeneralMultiGetOperation<K, V> implements
ForStDBOperation<List<V>> {
+public class ForStGeneralMultiGetOperation implements ForStDBOperation {
private static final Logger LOG =
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
private final RocksDB db;
- private final List<GetRequest<K, V>> batchRequest;
+ private final List<ForStDBGetRequest<?, ?>> batchRequest;
private final Executor executor;
ForStGeneralMultiGetOperation(
- RocksDB db, List<GetRequest<K, V>> batchRequest, Executor
executor) {
+ RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor
executor) {
this.db = db;
this.batchRequest = batchRequest;
this.executor = executor;
}
@Override
- public CompletableFuture<List<V>> process() {
+ public CompletableFuture<Void> process() {
- CompletableFuture<List<V>> future = new CompletableFuture<>();
- @SuppressWarnings("unchecked")
- V[] result = (V[]) new Object[batchRequest.size()];
- Arrays.fill(result, null);
+ CompletableFuture<Void> future = new CompletableFuture<>();
AtomicInteger counter = new AtomicInteger(batchRequest.size());
for (int i = 0; i < batchRequest.size(); i++) {
- GetRequest<K, V> request = batchRequest.get(i);
- final int index = i;
+ ForStDBGetRequest<?, ?> request = batchRequest.get(i);
executor.execute(
() -> {
try {
- ForStInnerTable<K, V> table = request.table;
- byte[] key = table.serializeKey(request.key);
- byte[] value =
db.get(table.getColumnFamilyHandle(), key);
- if (value != null) {
- result[index] = table.deserializeValue(value);
- }
+ byte[] key = request.buildSerializedKey();
+ byte[] value =
db.get(request.getColumnFamilyHandle(), key);
+ request.completeStateFuture(value);
} catch (Exception e) {
LOG.warn(
"Error when process general multiGet
operation for forStDB", e);
future.completeExceptionally(e);
Review Comment:
Yes, I think we need to make `InternalStateFuture` `completeExceptionally`
in this place.
However, I found that the `AsyncExecutionController` level currently lacks
some exception handling mechanism about `InternalStateFuture`. I tend to
handle the failure of state request execution from a global view in another
separate PR.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]