masteryhx commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1592121657


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##########
@@ -31,75 +30,50 @@
 /**
  * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
  * calling the Get API multiple times with multiple threads.
- *
- * @param <K> The type of key in get access request.
- * @param <V> The type of value in get access request.
  */
-public class ForStGeneralMultiGetOperation<K, V> implements 
ForStDBOperation<List<V>> {
+public class ForStGeneralMultiGetOperation implements ForStDBOperation {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
 
     private final RocksDB db;
 
-    private final List<GetRequest<K, V>> batchRequest;
+    private final List<ForStDBGetRequest<?, ?>> batchRequest;
 
     private final Executor executor;
 
     ForStGeneralMultiGetOperation(
-            RocksDB db, List<GetRequest<K, V>> batchRequest, Executor 
executor) {
+            RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor 
executor) {
         this.db = db;
         this.batchRequest = batchRequest;
         this.executor = executor;
     }
 
     @Override
-    public CompletableFuture<List<V>> process() {
+    public CompletableFuture<Void> process() {
 
-        CompletableFuture<List<V>> future = new CompletableFuture<>();
-        @SuppressWarnings("unchecked")
-        V[] result = (V[]) new Object[batchRequest.size()];
-        Arrays.fill(result, null);
+        CompletableFuture<Void> future = new CompletableFuture<>();
 
         AtomicInteger counter = new AtomicInteger(batchRequest.size());
         for (int i = 0; i < batchRequest.size(); i++) {
-            GetRequest<K, V> request = batchRequest.get(i);
-            final int index = i;
+            ForStDBGetRequest<?, ?> request = batchRequest.get(i);
             executor.execute(
                     () -> {
                         try {
-                            ForStInnerTable<K, V> table = request.table;
-                            byte[] key = table.serializeKey(request.key);
-                            byte[] value = 
db.get(table.getColumnFamilyHandle(), key);
-                            if (value != null) {
-                                result[index] = table.deserializeValue(value);
-                            }
+                            byte[] key = request.buildSerializedKey();
+                            byte[] value = 
db.get(request.getColumnFamilyHandle(), key);
+                            request.completeStateFuture(value);
                         } catch (Exception e) {
                             LOG.warn(
                                     "Error when process general multiGet 
operation for forStDB", e);
                             future.completeExceptionally(e);

Review Comment:
   Okay, I think it's reasonble to consider it together with handling mechanism 
of AEC later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to