This is an automated email from the ASF dual-hosted git repository.

popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git


The following commit(s) were added to refs/heads/main by this push:
     new a7f85e520 Refactor BatchMutationCall to optimize task batching and 
execution flow (#230)
a7f85e520 is described below

commit a7f85e5201488a71ce70d703b169a5d46194e8ea
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Thu Mar 5 09:44:13 2026 +0800

    Refactor BatchMutationCall to optimize task batching and execution flow 
(#230)
---
 .../basekv/client/scheduler/BatchMutationCall.java |  56 +++++++----
 .../client/scheduler/BatchMutationCallTest.java    | 109 ++++++++++++++++++---
 .../client/scheduler/TestBatchMutationCall.java    |  32 +++++-
 3 files changed, 163 insertions(+), 34 deletions(-)

diff --git 
a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
 
b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
index e5c6dee94..6c9a43ee2 100644
--- 
a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
+++ 
b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
@@ -42,7 +42,7 @@ import org.apache.bifromq.basescheduler.ICallTask;
 public abstract class BatchMutationCall<ReqT, RespT> implements 
IBatchCall<ReqT, RespT, MutationCallBatcherKey> {
     protected final MutationCallBatcherKey batcherKey;
     private final IMutationPipeline storePipeline;
-    private Deque<MutationCallTaskBatch<ReqT, RespT>> batchCallTasks = new 
ArrayDeque<>();
+    private Deque<ICallTask<ReqT, RespT, MutationCallBatcherKey>> 
pendingCallTasks = new ArrayDeque<>();
 
     protected BatchMutationCall(IMutationPipeline storePipeline, 
MutationCallBatcherKey batcherKey) {
         this.batcherKey = batcherKey;
@@ -51,20 +51,7 @@ public abstract class BatchMutationCall<ReqT, RespT> 
implements IBatchCall<ReqT,
 
     @Override
     public final void add(ICallTask<ReqT, RespT, MutationCallBatcherKey> 
callTask) {
-        MutationCallTaskBatch<ReqT, RespT> lastBatchCallTask;
-        MutationCallBatcherKey batcherKey = callTask.batcherKey();
-        assert callTask.batcherKey().id.equals(batcherKey.id);
-        if ((lastBatchCallTask = batchCallTasks.peekLast()) != null) {
-            if (!lastBatchCallTask.isBatchable(callTask)) {
-                lastBatchCallTask = newBatch(batcherKey.ver);
-                batchCallTasks.add(lastBatchCallTask);
-            }
-            lastBatchCallTask.add(callTask);
-        } else {
-            lastBatchCallTask = newBatch(batcherKey.ver);
-            lastBatchCallTask.add(callTask);
-            batchCallTasks.add(lastBatchCallTask);
-        }
+        pendingCallTasks.add(callTask);
     }
 
     protected MutationCallTaskBatch<ReqT, RespT> newBatch(long ver) {
@@ -81,25 +68,56 @@ public abstract class BatchMutationCall<ReqT, RespT> 
implements IBatchCall<ReqT,
     @Override
     public void reset(boolean abort) {
         if (abort) {
-            batchCallTasks = new ArrayDeque<>();
+            pendingCallTasks = new ArrayDeque<>();
         }
     }
 
     @Override
     public CompletableFuture<Void> execute() {
-        return execute(batchCallTasks);
+        return executeBatches();
     }
 
-    private CompletableFuture<Void> execute(Deque<MutationCallTaskBatch<ReqT, 
RespT>> batchCallTasks) {
+    private CompletableFuture<Void> executeBatches() {
         CompletableFuture<Void> chained = 
CompletableFuture.completedFuture(null);
         MutationCallTaskBatch<ReqT, RespT> batchCallTask;
-        while ((batchCallTask = batchCallTasks.poll()) != null) {
+        while ((batchCallTask = buildNextBatch()) != null) {
             MutationCallTaskBatch<ReqT, RespT> current = batchCallTask;
             chained = chained.thenCompose(v -> fireSingleBatch(current));
         }
         return chained;
     }
 
+    private MutationCallTaskBatch<ReqT, RespT> buildNextBatch() {
+        if (pendingCallTasks.isEmpty()) {
+            return null;
+        }
+        MutationCallTaskBatch<ReqT, RespT> batchCallTask = null;
+        long batchVer = -1;
+        int size = pendingCallTasks.size();
+        for (int i = 0; i < size; i++) {
+            ICallTask<ReqT, RespT, MutationCallBatcherKey> task = 
pendingCallTasks.pollFirst();
+            if (task == null) {
+                break;
+            }
+            if (batchCallTask == null) {
+                batchVer = task.batcherKey().ver;
+                batchCallTask = newBatch(batchVer);
+                batchCallTask.add(task);
+                continue;
+            }
+            if (task.batcherKey().ver != batchVer) {
+                pendingCallTasks.addLast(task);
+                continue;
+            }
+            if (batchCallTask.isBatchable(task)) {
+                batchCallTask.add(task);
+            } else {
+                pendingCallTasks.addLast(task);
+            }
+        }
+        return batchCallTask;
+    }
+
     private CompletableFuture<Void> 
fireSingleBatch(MutationCallTaskBatch<ReqT, RespT> batchCallTask) {
         RWCoProcInput input = makeBatch(batchCallTask.batchedTasks);
         long reqId = System.nanoTime();
diff --git 
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
 
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
index 72b1c3457..f13fb9114 100644
--- 
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
+++ 
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
@@ -30,6 +30,7 @@ import static org.testng.Assert.assertEquals;
 import com.google.protobuf.ByteString;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -77,9 +78,11 @@ public class BatchMutationCallTest {
 
     @Test
     public void addToSameBatch() {
-        when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {{
-            put(FULL_BOUNDARY, setting(id, "V1", 0));
-        }});
+        when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {
+            {
+                put(FULL_BOUNDARY, setting(id, "V1", 0));
+            }
+        });
 
         
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
         when(mutationPipeline1.execute(any()))
@@ -103,7 +106,8 @@ public class BatchMutationCallTest {
             String[] keys = 
request.getRwCoProc().getRaw().toStringUtf8().split("_");
             assertEquals(keys.length, Sets.newSet(keys).size());
         }
-        // the resp order preserved
+        Collections.sort(reqList);
+        Collections.sort(respList);
         assertEquals(reqList, respList);
     }
 
@@ -124,19 +128,24 @@ public class BatchMutationCallTest {
             int req = ThreadLocalRandom.current().nextInt(1, 1001);
             reqList.add(req);
             if (req < 500) {
-                when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {{
-                    put(FULL_BOUNDARY, setting(id, "V1", 0));
-                }});
+                when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {
+                    {
+                        put(FULL_BOUNDARY, setting(id, "V1", 0));
+                    }
+                });
             } else {
-                when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {{
-                    put(FULL_BOUNDARY, setting(id, "V2", 0));
-                }});
+                when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {
+                    {
+                        put(FULL_BOUNDARY, setting(id, "V2", 0));
+                    }
+                });
             }
             
futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req)))
                 .thenAccept((v) -> 
respList.add(Integer.parseInt(v.toStringUtf8()))));
         }
         
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
-        // the resp order preserved
+        Collections.sort(reqList);
+        Collections.sort(respList);
         assertEquals(reqList, respList);
     }
 
@@ -166,4 +175,82 @@ public class BatchMutationCallTest {
         
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
         assertEquals(execCount.get(), n);
     }
+
+    @Test
+    public void reScanWhenHitNonBatchable() {
+        when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {
+            {
+                put(FULL_BOUNDARY, setting(id, "V1", 0));
+            }
+        });
+        
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
+        when(mutationPipeline1.execute(any()))
+            .thenReturn(CompletableFuture.supplyAsync(() -> 
KVRangeRWReply.newBuilder().build()));
+
+        MutationCallScheduler<ByteString, ByteString, TestBatchMutationCall> 
scheduler =
+            new MutationCallScheduler<>(NonBatchableBatchCall::new, 
Duration.ofMillis(1000).toNanos(), storeClient) {
+                @Override
+                protected ByteString rangeKey(ByteString call) {
+                    return call;
+                }
+            };
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        List<ByteString> reqs = List.of(
+            ByteString.copyFromUtf8("k1"),
+            ByteString.copyFromUtf8("k_dup"), // will mark non-batchable in 
first batch
+            ByteString.copyFromUtf8("k2"));
+        List<ByteString> resps = new CopyOnWriteArrayList<>();
+        reqs.forEach(req -> 
futures.add(scheduler.schedule(req).thenAccept(resps::add)));
+        
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
+        List<String> reqSorted = 
reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
+        List<String> respSorted = 
resps.stream().map(ByteString::toStringUtf8).sorted().toList();
+        assertEquals(reqSorted, respSorted);
+    }
+
+    @Test
+    public void mixDifferentVersions() {
+        
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
+        
when(storeClient.createMutationPipeline("V2")).thenReturn(mutationPipeline2);
+        when(mutationPipeline1.execute(any()))
+            .thenReturn(CompletableFuture.supplyAsync(() -> 
KVRangeRWReply.newBuilder().build()));
+        when(mutationPipeline2.execute(any()))
+            .thenReturn(CompletableFuture.supplyAsync(() -> 
KVRangeRWReply.newBuilder().build()));
+        TestMutationCallScheduler scheduler = new 
TestMutationCallScheduler(storeClient, Duration.ofMillis(1000));
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        List<ByteString> reqs = new ArrayList<>();
+        List<ByteString> resps = new CopyOnWriteArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            ByteString req = ByteString.copyFromUtf8("k" + i);
+            reqs.add(req);
+            if (i % 2 == 0) {
+                when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {
+                    {
+                        put(FULL_BOUNDARY, setting(id, "V1", 0));
+                    }
+                });
+            } else {
+                when(storeClient.latestEffectiveRouter()).thenReturn(new 
TreeMap<>(BoundaryUtil::compare) {
+                    {
+                        put(FULL_BOUNDARY, setting(id, "V2", 1));
+                    }
+                });
+            }
+            futures.add(scheduler.schedule(req).thenAccept(resps::add));
+        }
+        
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
+        List<String> reqSorted = 
reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
+        List<String> respSorted = 
resps.stream().map(ByteString::toStringUtf8).sorted().toList();
+        assertEquals(reqSorted, respSorted);
+    }
+
+    private static class NonBatchableBatchCall extends TestBatchMutationCall {
+        protected NonBatchableBatchCall(IMutationPipeline pipeline, 
MutationCallBatcherKey batcherKey) {
+            super(pipeline, batcherKey);
+        }
+
+        @Override
+        protected NonBatchableFirstBatch newBatch(long ver) {
+            return new NonBatchableFirstBatch(ver);
+        }
+    }
 }
diff --git 
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
 
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
index 276634b09..cbb06fff6 100644
--- 
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
+++ 
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
@@ -19,16 +19,16 @@
 
 package org.apache.bifromq.basekv.client.scheduler;
 
-import org.apache.bifromq.basekv.client.IMutationPipeline;
-import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
-import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
-import org.apache.bifromq.basescheduler.ICallTask;
 import com.google.common.collect.Iterables;
 import com.google.protobuf.ByteString;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.Set;
+import org.apache.bifromq.basekv.client.IMutationPipeline;
+import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
+import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
+import org.apache.bifromq.basescheduler.ICallTask;
 
 public class TestBatchMutationCall extends BatchMutationCall<ByteString, 
ByteString> {
     protected TestBatchMutationCall(IMutationPipeline pipeline, 
MutationCallBatcherKey batcherKey) {
@@ -86,4 +86,28 @@ public class TestBatchMutationCall extends 
BatchMutationCall<ByteString, ByteStr
             return !keys.contains(callTask.call());
         }
     }
+
+    static class NonBatchableFirstBatch extends 
MutationCallTaskBatch<ByteString, ByteString> {
+        private final Set<ByteString> keys = new HashSet<>();
+        private boolean sawNonBatchable;
+
+        protected NonBatchableFirstBatch(long ver) {
+            super(ver);
+        }
+
+        @Override
+        protected void add(ICallTask<ByteString, ByteString, 
MutationCallBatcherKey> callTask) {
+            super.add(callTask);
+            keys.add(callTask.call());
+        }
+
+        @Override
+        protected boolean isBatchable(ICallTask<ByteString, ByteString, 
MutationCallBatcherKey> callTask) {
+            if (!sawNonBatchable && 
callTask.call().toStringUtf8().contains("dup")) {
+                sawNonBatchable = true;
+                return false;
+            }
+            return !keys.contains(callTask.call());
+        }
+    }
 }

Reply via email to