This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_add_peer
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_add_peer by this push:
     new 593b62e166 complete the part for MultiLeaderServerImpl to add new Peer 
sync channel
593b62e166 is described below

commit 593b62e166b75c5a3617c0c48a9cd6f89cd9c75d
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Aug 31 17:57:43 2022 +0800

    complete the part for MultiLeaderServerImpl to add new Peer sync channel
---
 .../multileader/MultiLeaderConsensus.java          | 12 +++
 .../multileader/MultiLeaderServerImpl.java         | 10 +++
 .../multileader/logdispatcher/IndexController.java |  9 ++-
 .../multileader/logdispatcher/LogDispatcher.java   | 90 ++++++++++++++++------
 .../service/MultiLeaderRPCServiceProcessor.java    |  7 ++
 .../logdispatcher/IndexControllerTest.java         |  8 +-
 .../multileader/logdispatcher/SyncStatusTest.java  |  8 +-
 .../src/main/thrift/mutlileader.thrift             | 10 +++
 8 files changed, 119 insertions(+), 35 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 33e59e361b..033303442b 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -230,6 +230,18 @@ public class MultiLeaderConsensus implements IConsensus {
 
   @Override
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) 
{
+    MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
+    if (impl == null) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+    // step 1: inactive new Peer to prepare for following steps
+    impl.inactivePeer(peer);
+
+    // step 2: notify all the other Peers to build the sync connection to 
newPeer
+    impl.notifyPeersToBuildSyncLogChannel(peer);
+
     return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index c5e88aab16..9fc28bc826 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -183,6 +183,16 @@ public class MultiLeaderServerImpl {
     stateMachine.loadSnapshot(latestSnapshotRootDir);
   }
 
+  public void inactivePeer(Peer peer) {
+    // TODO: (xingtanzjr) investigate how to implement here smoothly using 
sync/async client
+  }
+
+  public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) {}
+
+  public void buildSyncLogChannel(Peer targetPeer) {
+
+  }
+
   public void persistConfiguration() {
     try (PublicBAOS publicBAOS = new PublicBAOS();
         DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
index 62cc0e5894..26f2a9c002 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexController.java
@@ -47,10 +47,12 @@ public class IndexController {
 
   private final String storageDir;
   private final String prefix;
+  private final long initialIndex;
 
-  public IndexController(String storageDir, String prefix) {
+  public IndexController(String storageDir, String prefix, long initialIndex) {
     this.storageDir = storageDir;
     this.prefix = prefix + '-';
+    this.initialIndex = initialIndex;
     restore();
   }
 
@@ -137,10 +139,13 @@ public class IndexController {
       }
       currentIndex = lastFlushedIndex;
     } else {
-      versionFile = new File(directory, prefix + "0");
+      currentIndex = initialIndex;
+      versionFile = new File(directory, prefix + initialIndex);
       try {
         Files.createFile(versionFile.toPath());
       } catch (IOException e) {
+        // TODO: (xingtanzjr) we need to handle the situation that file 
creation failed.
+        //  Or the dispatcher won't run correctly
         logger.error("Error occurred when creating new file {}", 
versionFile.getAbsolutePath(), e);
       }
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index e3539752be..97cee9c3c2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -55,14 +55,15 @@ import java.util.stream.Collectors;
 
 /** Manage all asynchronous replication threads and corresponding async 
clients */
 public class LogDispatcher {
-
   private final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-
+  private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
   private final MultiLeaderServerImpl impl;
   private final List<LogDispatcherThread> threads;
   private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> 
clientManager;
   private ExecutorService executorService;
 
+  private boolean stopped = false;
+
   public LogDispatcher(
       MultiLeaderServerImpl impl,
       IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
@@ -71,22 +72,26 @@ public class LogDispatcher {
     this.threads =
         impl.getConfiguration().stream()
             .filter(x -> !Objects.equals(x, impl.getThisNode()))
-            .map(x -> new LogDispatcherThread(x, impl.getConfig()))
+            .map(x -> new LogDispatcherThread(x, impl.getConfig(), 
DEFAULT_INITIAL_SYNC_INDEX))
             .collect(Collectors.toList());
     if (!threads.isEmpty()) {
+      // We use cached thread pool here because each LogDispatcherThread will 
occupy one thread.
+      // And every LogDispatcherThread won't release its thread in this pool 
because it won't stop
+      // unless LogDispatcher stop.
+      // Thus, the size of this threadPool will be the same as the count of 
LogDispatcherThread.
       this.executorService =
-          IoTDBThreadPoolFactory.newFixedThreadPool(
-              threads.size(), "LogDispatcher-" + 
impl.getThisNode().getGroupId());
+          IoTDBThreadPoolFactory.newCachedThreadPool(
+              "LogDispatcher-" + impl.getThisNode().getGroupId());
     }
   }
 
-  public void start() {
+  public synchronized void start() {
     if (!threads.isEmpty()) {
       threads.forEach(executorService::submit);
     }
   }
 
-  public void stop() {
+  public synchronized void stop() {
     if (!threads.isEmpty()) {
       threads.forEach(LogDispatcherThread::stop);
       executorService.shutdownNow();
@@ -100,31 +105,64 @@ public class LogDispatcher {
         logger.error("Unexpected Interruption when closing LogDispatcher 
service ");
       }
     }
+    stopped = true;
   }
 
-  public OptionalLong getMinSyncIndex() {
+  public synchronized void addLogDispatcherThread(Peer peer, long 
initialIndex) {
+    if (stopped) {
+      return;
+    }
+    //
+    LogDispatcherThread thread = new LogDispatcherThread(peer, 
impl.getConfig(), initialIndex);
+    threads.add(thread);
+    executorService.submit(thread);
+  }
+
+  public synchronized void removeLogDispatcherThread(Peer peer) {
+    if (stopped) {
+      return;
+    }
+    int threadIndex = -1;
+    for (int i = 0; i < threads.size(); i++) {
+      if 
(threads.get(i).peer.getEndpoint().getIp().equals(peer.getEndpoint().getIp())) {
+        threadIndex = i;
+        break;
+      }
+    }
+    if (threadIndex == -1) {
+      return;
+    }
+    threads.get(threadIndex).stop();
+    threads.remove(threadIndex);
+  }
+
+  public synchronized OptionalLong getMinSyncIndex() {
     return 
threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
   }
 
   public void offer(IndexedConsensusRequest request) {
     List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
-    threads.forEach(
-        thread -> {
-          logger.debug(
-              "{}->{}: Push a log to the queue, where the queue length is {}",
-              impl.getThisNode().getGroupId(),
-              thread.getPeer().getEndpoint().getIp(),
-              thread.getPendingRequest().size());
-          if (!thread
-              .getPendingRequest()
-              .offer(new IndexedConsensusRequest(serializedRequests, 
request.getSearchIndex()))) {
+    // we put the serialization step outside the synchronized block because it 
is stateless and
+    // time-consuming
+    synchronized (this) {
+      threads.forEach(
+          thread -> {
             logger.debug(
-                "{}: Log queue of {} is full, ignore the log to this node, 
searchIndex: {}",
+                "{}->{}: Push a log to the queue, where the queue length is 
{}",
                 impl.getThisNode().getGroupId(),
-                thread.getPeer(),
-                request.getSearchIndex());
-          }
-        });
+                thread.getPeer().getEndpoint().getIp(),
+                thread.getPendingRequest().size());
+            if (!thread
+                .getPendingRequest()
+                .offer(new IndexedConsensusRequest(serializedRequests, 
request.getSearchIndex()))) {
+              logger.debug(
+                  "{}: Log queue of {} is full, ignore the log to this node, 
searchIndex: {}",
+                  impl.getThisNode().getGroupId(),
+                  thread.getPeer(),
+                  request.getSearchIndex());
+            }
+          });
+    }
   }
 
   public class LogDispatcherThread implements Runnable {
@@ -146,14 +184,16 @@ public class LogDispatcher {
     private ConsensusReqReader.ReqIterator walEntryiterator;
     private long iteratorIndex = 1;
 
-    public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
+    public LogDispatcherThread(Peer peer, MultiLeaderConfig config, long 
initialSyncIndex) {
       this.peer = peer;
       this.config = config;
       this.pendingRequest =
           new 
ArrayBlockingQueue<>(config.getReplication().getMaxPendingRequestNumPerNode());
       this.controller =
           new IndexController(
-              impl.getStorageDir(), 
Utils.fromTEndPointToString(peer.getEndpoint()));
+              impl.getStorageDir(),
+              Utils.fromTEndPointToString(peer.getEndpoint()),
+              initialSyncIndex);
       this.syncStatus = new SyncStatus(controller, config);
       this.walEntryiterator = reader.getReqIterator(iteratorIndex);
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 201d04ab35..ec8e3f5789 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -31,6 +31,8 @@ import 
org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import 
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
 import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
 import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
+import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
 import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
 import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
@@ -165,5 +167,10 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
         new TActivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
   }
 
+  @Override
+  public void buildSyncLogChannel(
+      TBuildSyncLogChannelReq req, 
AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
+      throws TException {}
+
   public void handleClientExit() {}
 }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
index aa6f63783f..1f2c228f11 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/IndexControllerTest.java
@@ -46,7 +46,7 @@ public class IndexControllerTest {
   /** test indexController when incrementIntervalAfterRestart == false */
   @Test
   public void testIncrementIntervalAfterRestart() {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -55,7 +55,7 @@ public class IndexControllerTest {
     Assert.assertEquals(IndexController.FLUSH_INTERVAL - 1, 
controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -63,7 +63,7 @@ public class IndexControllerTest {
     Assert.assertEquals(IndexController.FLUSH_INTERVAL + 1, 
controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
 
@@ -71,7 +71,7 @@ public class IndexControllerTest {
     Assert.assertEquals(IndexController.FLUSH_INTERVAL * 2 - 1, 
controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
 
-    controller = new IndexController(storageDir.getAbsolutePath(), prefix);
+    controller = new IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getCurrentIndex());
     Assert.assertEquals(IndexController.FLUSH_INTERVAL, 
controller.getLastFlushedIndex());
 
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index db6b03377a..4f94aafeaa 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -54,7 +54,7 @@ public class SyncStatusTest {
   /** Confirm success from front to back */
   @Test
   public void sequenceTest() throws InterruptedException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
@@ -79,7 +79,7 @@ public class SyncStatusTest {
   /** Confirm success from back to front */
   @Test
   public void reverseTest() throws InterruptedException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -111,7 +111,7 @@ public class SyncStatusTest {
   /** Confirm success first from front to back, then back to front */
   @Test
   public void mixedTest() throws InterruptedException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -155,7 +155,7 @@ public class SyncStatusTest {
   /** Test Blocking while addNextBatch */
   @Test
   public void waitTest() throws InterruptedException, ExecutionException {
-    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix);
+    IndexController controller = new 
IndexController(storageDir.getAbsolutePath(), prefix, 0);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
     SyncStatus status = new SyncStatus(controller, config);
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift 
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 2b9d7ac587..993eb40ba7 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -51,8 +51,18 @@ struct TSyncLogRes {
   1: required list<common.TSStatus> status
 }
 
+struct TBuildSyncLogChannelReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required common.TEndPoint endPoint
+}
+
+struct TBuildSyncLogChannelRes {
+  1: required list<common.TSStatus> status
+}
+
 service MultiLeaderConsensusIService {
   TSyncLogRes syncLog(TSyncLogReq req)
   TInactivatePeerRes inactivatePeer(TInactivatePeerReq req)
   TActivatePeerRes activatePeer(TActivatePeerReq req)
+  TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
 }
\ No newline at end of file

Reply via email to