masteryhx commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1592121657
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##########
@@ -31,75 +30,50 @@
/**
* The general-purpose multiGet operation implementation for ForStDB, which
simulates multiGet by
* calling the Get API multiple times with multiple threads.
- *
- * @param <K> The type of key in get access request.
- * @param <V> The type of value in get access request.
*/
-public class ForStGeneralMultiGetOperation<K, V> implements
ForStDBOperation<List<V>> {
+public class ForStGeneralMultiGetOperation implements ForStDBOperation {
private static final Logger LOG =
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
private final RocksDB db;
- private final List<GetRequest<K, V>> batchRequest;
+ private final List<ForStDBGetRequest<?, ?>> batchRequest;
private final Executor executor;
ForStGeneralMultiGetOperation(
- RocksDB db, List<GetRequest<K, V>> batchRequest, Executor
executor) {
+ RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor
executor) {
this.db = db;
this.batchRequest = batchRequest;
this.executor = executor;
}
@Override
- public CompletableFuture<List<V>> process() {
+ public CompletableFuture<Void> process() {
- CompletableFuture<List<V>> future = new CompletableFuture<>();
- @SuppressWarnings("unchecked")
- V[] result = (V[]) new Object[batchRequest.size()];
- Arrays.fill(result, null);
+ CompletableFuture<Void> future = new CompletableFuture<>();
AtomicInteger counter = new AtomicInteger(batchRequest.size());
for (int i = 0; i < batchRequest.size(); i++) {
- GetRequest<K, V> request = batchRequest.get(i);
- final int index = i;
+ ForStDBGetRequest<?, ?> request = batchRequest.get(i);
executor.execute(
() -> {
try {
- ForStInnerTable<K, V> table = request.table;
- byte[] key = table.serializeKey(request.key);
- byte[] value =
db.get(table.getColumnFamilyHandle(), key);
- if (value != null) {
- result[index] = table.deserializeValue(value);
- }
+ byte[] key = request.buildSerializedKey();
+ byte[] value =
db.get(request.getColumnFamilyHandle(), key);
+ request.completeStateFuture(value);
} catch (Exception e) {
LOG.warn(
"Error when process general multiGet
operation for forStDB", e);
future.completeExceptionally(e);
Review Comment:
Okay, I think it's reasonble to consider it together with handling mechanism
of AEC later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]