This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2126e2febc731d657ca6a112b87bc9cbd13a9eb6
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Aug 3 16:56:58 2022 +0800

    leverage client insert thread to do serialization of request for dispatching
---
 .../common/request/IndexedConsensusRequest.java    | 19 +++++++++++++-
 .../multileader/logdispatcher/LogDispatcher.java   | 29 +++++++++++++++++++---
 2 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..1c004264dd 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 
@@ -29,13 +30,19 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   /** we do not need to serialize these two fields as they are useless in 
other nodes. */
   private final long searchIndex;
 
-  private final List<IConsensusRequest> requests;
+  private List<IConsensusRequest> requests;
+  private List<ByteBuffer> serializedRequests;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> 
requests) {
     this.searchIndex = searchIndex;
     this.requests = requests;
   }
 
+  public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long 
searchIndex) {
+    this.searchIndex = searchIndex;
+    this.serializedRequests = serializedRequests;
+  }
+
   @Override
   public ByteBuffer serializeToByteBuffer() {
     throw new UnsupportedOperationException();
@@ -49,6 +56,16 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
     return searchIndex;
   }
 
+  public List<ByteBuffer> getSerializedRequests() {
+    return serializedRequests;
+  }
+
+  public List<ByteBuffer> buildSerializedRequests() {
+    List<ByteBuffer> result = new LinkedList<>();
+    this.requests.forEach(r -> result.add(r.serializeToByteBuffer()));
+    return result;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ed41ec5e16..da5ad41aaa 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -106,17 +107,27 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
+    List<ByteBuffer> serializedRequest = request.buildSerializedRequests();
     threads.forEach(
         thread -> {
           logger.debug(
               "{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
               thread.getPendingRequest().size());
-          if (!thread.getPendingRequest().offer(request)) {
+          if (!thread
+              .getPendingRequest()
+              .offer(new IndexedConsensusRequest(serializedRequest, 
request.getSearchIndex()))) {
+            logger.info(
+                "{}: Log queue to {} is full. skip current request: {}",
+                impl.getThisNode().getGroupId(),
+                thread.getPeer().getEndpoint().getIp(),
+                request.getSearchIndex());
             logger.debug(
                 "{}: Log queue of {} is full, ignore the log to this node",
                 impl.getThisNode().getGroupId(),
                 thread.getPeer());
+          } else {
+            thread.countQueueUsage(request.getSearchIndex());
           }
         });
   }
@@ -139,6 +150,7 @@ public class LogDispatcher {
 
     private ConsensusReqReader.ReqIterator walEntryiterator;
     private long iteratorIndex = 1;
+    private long queueProcessCount = 0;
 
     public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
@@ -156,6 +168,16 @@ public class LogDispatcher {
       return controller;
     }
 
+    public void countQueueUsage(long searchIndex) {
+      this.queueProcessCount++;
+      logger.info(
+          "{}: queue to {}: put request to queue. count: {}, searchIndex {}",
+          impl.getThisNode().getGroupId(),
+          getPeer().getEndpoint().getIp(),
+          this.queueProcessCount,
+          searchIndex);
+    }
+
     public long getCurrentSyncIndex() {
       return controller.getCurrentIndex();
     }
@@ -355,9 +377,8 @@ public class LogDispatcher {
 
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, List<TLogBatch> logBatches) {
-      for (IConsensusRequest innerRequest : request.getRequests()) {
-        logBatches.add(
-            new TLogBatch(innerRequest.serializeToByteBuffer(), 
request.getSearchIndex(), false));
+      for (ByteBuffer innerRequest : request.getSerializedRequests()) {
+        logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), 
false));
       }
     }
   }

Reply via email to