This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4fba35edd93f72575673e889a698a2df5eb31926
Author: 133tosakarin <[email protected]>
AuthorDate: Fri Aug 2 09:47:34 2024 +0800

    [fix] Change IoTConsensusService and PipeConsensusService from async  to 
sync  (#13077)
    
    (cherry picked from commit 0a611a8c7c45e3d4f6a27c4f39860dbf4289f0a1)
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |   2 +-
 .../iot/service/IoTConsensusRPCService.java        |  14 +-
 .../service/IoTConsensusRPCServiceProcessor.java   | 185 +++++++++------------
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |   2 +-
 .../pipe/service/PipeConsensusRPCService.java      |  15 +-
 .../service/PipeConsensusRPCServiceProcessor.java  |  64 +++----
 .../iotdb/commons/service/ThriftService.java       |  15 +-
 7 files changed, 111 insertions(+), 186 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index b94781761e9..464311dd94d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -133,7 +133,7 @@ public class IoTConsensus implements IConsensus {
   @Override
   public synchronized void start() throws IOException {
     initAndRecover();
-    service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
+    service.initSyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
     try {
       registerManager.register(service);
     } catch (StartupException e) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
index 1be3b67b3b1..8f7c484af4b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
 import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 
-import org.apache.thrift.TBaseAsyncProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,10 +53,10 @@ public class IoTConsensusRPCService extends ThriftService 
implements IoTConsensu
   }
 
   @Override
-  public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
+  public void initSyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
     this.iotConsensusRPCServiceProcessor =
         (IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor;
-    super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor);
+    super.initSyncedServiceImpl(iotConsensusRPCServiceProcessor);
   }
 
   @Override
@@ -67,7 +66,7 @@ public class IoTConsensusRPCService extends ThriftService 
implements IoTConsensu
           InstantiationException,
           NoSuchMethodException,
           InvocationTargetException {
-    processor = new 
IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor);
+    processor = new 
IoTConsensusIService.Processor<>(iotConsensusRPCServiceProcessor);
   }
 
   @Override
@@ -76,20 +75,15 @@ public class IoTConsensusRPCService extends ThriftService 
implements IoTConsensu
     try {
       thriftServiceThread =
           new ThriftServiceThread(
-              (TBaseAsyncProcessor<?>) processor,
+              processor,
               getID().getName(),
               ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(),
               getBindIP(),
               getBindPort(),
-              config.getRpc().getRpcSelectorThreadNum(),
-              config.getRpc().getRpcMinConcurrentClientNum(),
               config.getRpc().getRpcMaxConcurrentClientNum(),
               config.getRpc().getThriftServerAwaitTimeForStopService(),
               new 
IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor),
               config.getRpc().isRpcThriftCompressionEnabled(),
-              config.getRpc().getConnectionTimeoutInMs(),
-              config.getRpc().getThriftMaxFrameSize(),
-              ThriftServiceThread.ServerType.SELECTOR,
               ZeroCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index e29d48fed7c..a609acd2651 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -55,14 +55,13 @@ import 
org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.stream.Collectors;
 
-public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.AsyncIface {
+public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Iface {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class);
@@ -74,98 +73,85 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
   }
 
   @Override
-  public void syncLogEntries(
-      TSyncLogEntriesReq req, AsyncMethodCallback<TSyncLogEntriesRes> 
resultHandler) {
-    try {
-      ConsensusGroupId groupId =
-          
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-      IoTConsensusServerImpl impl = consensus.getImpl(groupId);
-      if (impl == null) {
-        String message =
-            String.format(
-                "unexpected consensusGroupId %s for TSyncLogEntriesReq which 
size is %s",
-                groupId, req.getLogEntries().size());
-        LOGGER.error(message);
-        TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-        status.setMessage(message);
-        resultHandler.onComplete(new 
TSyncLogEntriesRes(Collections.singletonList(status)));
-        return;
-      }
-      if (impl.isReadOnly()) {
-        String message = "fail to sync logEntries because system is 
read-only.";
-        LOGGER.error(message);
-        TSStatus status = new 
TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
-        status.setMessage(message);
-        resultHandler.onComplete(new 
TSyncLogEntriesRes(Collections.singletonList(status)));
-        return;
-      }
-      if (!impl.isActive()) {
-        TSStatus status = new 
TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
-        status.setMessage("peer is inactive and not ready to receive sync log 
request");
-        resultHandler.onComplete(new 
TSyncLogEntriesRes(Collections.singletonList(status)));
-        return;
-      }
-      BatchIndexedConsensusRequest logEntriesInThisBatch =
-          new BatchIndexedConsensusRequest(req.peerId);
-      // We use synchronized to ensure atomicity of executing multiple logs
-      for (TLogEntry entry : req.getLogEntries()) {
-        logEntriesInThisBatch.add(
-            impl.buildIndexedConsensusRequestForRemoteRequest(
-                entry.getSearchIndex(),
-                entry.getData().stream()
-                    .map(
-                        entry.isFromWAL()
-                            ? IoTConsensusRequest::new
-                            : ByteBufferConsensusRequest::new)
-                    .collect(Collectors.toList())));
-      }
-      long buildRequestTime = System.nanoTime();
-      IConsensusRequest deserializedRequest =
-          impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
-      impl.getIoTConsensusServerMetrics()
-          .recordDeserializeCost(System.nanoTime() - buildRequestTime);
-      TSStatus writeStatus =
-          impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), 
deserializedRequest);
-      LOGGER.debug(
-          "execute TSyncLogEntriesReq for {} with result {}",
-          req.consensusGroupId,
-          writeStatus.subStatus);
-      resultHandler.onComplete(new TSyncLogEntriesRes(writeStatus.subStatus));
-    } catch (Exception e) {
-      resultHandler.onError(e);
+  public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for TSyncLogEntriesReq which 
size is %s",
+              groupId, req.getLogEntries().size());
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TSyncLogEntriesRes(Collections.singletonList(status));
     }
+    if (impl.isReadOnly()) {
+      String message = "fail to sync logEntries because system is read-only.";
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
+      status.setMessage(message);
+      return new TSyncLogEntriesRes(Collections.singletonList(status));
+    }
+    if (!impl.isActive()) {
+      TSStatus status = new 
TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
+      status.setMessage("peer is inactive and not ready to receive sync log 
request");
+      return new TSyncLogEntriesRes(Collections.singletonList(status));
+    }
+    BatchIndexedConsensusRequest logEntriesInThisBatch =
+        new BatchIndexedConsensusRequest(req.peerId);
+    // We use synchronized to ensure atomicity of executing multiple logs
+    for (TLogEntry entry : req.getLogEntries()) {
+      logEntriesInThisBatch.add(
+          impl.buildIndexedConsensusRequestForRemoteRequest(
+              entry.getSearchIndex(),
+              entry.getData().stream()
+                  .map(
+                      entry.isFromWAL()
+                          ? IoTConsensusRequest::new
+                          : ByteBufferConsensusRequest::new)
+                  .collect(Collectors.toList())));
+    }
+    long buildRequestTime = System.nanoTime();
+    IConsensusRequest deserializedRequest =
+        impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
+    
impl.getIoTConsensusServerMetrics().recordDeserializeCost(System.nanoTime() - 
buildRequestTime);
+    TSStatus writeStatus =
+        impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), 
deserializedRequest);
+    LOGGER.debug(
+        "execute TSyncLogEntriesReq for {} with result {}",
+        req.consensusGroupId,
+        writeStatus.subStatus);
+    return new TSyncLogEntriesRes(writeStatus.subStatus);
   }
 
   @Override
-  public void inactivatePeer(
-      TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes> 
resultHandler)
-      throws TException {
+  public TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) throws 
TException {
     if (req.isForDeletionPurpose()) {
       
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
     }
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+
     if (impl == null) {
       String message =
           String.format("unexpected consensusGroupId %s for inactivatePeer 
request", groupId);
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TInactivatePeerRes(status));
-      return;
+      return new TInactivatePeerRes(status);
     }
     impl.setActive(false);
-    resultHandler.onComplete(
-        new TInactivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
     if (req.isForDeletionPurpose()) {
       
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
     }
+    return new TInactivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override
-  public void activatePeer(
-      TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> 
resultHandler) throws TException {
+  public TActivatePeerRes activatePeer(TActivatePeerReq req) throws TException 
{
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -175,18 +161,15 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TActivatePeerRes(status));
-      return;
+      return new TActivatePeerRes(status);
     }
     KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
     impl.setActive(true);
-    resultHandler.onComplete(
-        new TActivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+    return new TActivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override
-  public void buildSyncLogChannel(
-      TBuildSyncLogChannelReq req, 
AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
+  public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq 
req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -197,8 +180,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TBuildSyncLogChannelRes(status));
-      return;
+      return new TBuildSyncLogChannelRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -208,12 +190,11 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
+    return new TBuildSyncLogChannelRes(responseStatus);
   }
 
   @Override
-  public void removeSyncLogChannel(
-      TRemoveSyncLogChannelReq req, 
AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler)
+  public TRemoveSyncLogChannelRes 
removeSyncLogChannel(TRemoveSyncLogChannelReq req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -224,8 +205,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TRemoveSyncLogChannelRes(status));
-      return;
+      return new TRemoveSyncLogChannelRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -235,12 +215,11 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
+    return new TRemoveSyncLogChannelRes(responseStatus);
   }
 
   @Override
-  public void waitSyncLogComplete(
-      TWaitSyncLogCompleteReq req, 
AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler)
+  public TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq 
req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -251,18 +230,15 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
-      return;
+      return new TWaitSyncLogCompleteRes(true, 0, 0);
     }
     long searchIndex = impl.getSearchIndex();
     long safeIndex = impl.getMinSyncIndex();
-    resultHandler.onComplete(
-        new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, 
safeIndex));
+    return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, 
safeIndex);
   }
 
   @Override
-  public void sendSnapshotFragment(
-      TSendSnapshotFragmentReq req, 
AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
+  public TSendSnapshotFragmentRes 
sendSnapshotFragment(TSendSnapshotFragmentReq req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -273,8 +249,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TSendSnapshotFragmentRes(status));
-      return;
+      return new TSendSnapshotFragmentRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -284,12 +259,11 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus));
+    return new TSendSnapshotFragmentRes(responseStatus);
   }
 
   @Override
-  public void triggerSnapshotLoad(
-      TTriggerSnapshotLoadReq req, 
AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler)
+  public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq 
req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -300,20 +274,16 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TTriggerSnapshotLoadRes(status));
-      return;
+      return new TTriggerSnapshotLoadRes(status);
     }
     impl.loadSnapshot(req.snapshotId);
     KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
-    resultHandler.onComplete(
-        new TTriggerSnapshotLoadRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+    return new TTriggerSnapshotLoadRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override
-  public void cleanupTransferredSnapshot(
-      TCleanupTransferredSnapshotReq req,
-      AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler)
-      throws TException {
+  public TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(
+      TCleanupTransferredSnapshotReq req) throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -323,8 +293,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status));
-      return;
+      return new TCleanupTransferredSnapshotRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -335,7 +304,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new 
TCleanupTransferredSnapshotRes(responseStatus));
+    return new TCleanupTransferredSnapshotRes(responseStatus);
   }
 
   public void handleClientExit() {}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index a6d87669269..7cd0470f977 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -116,7 +116,7 @@ public class PipeConsensus implements IConsensus {
   public synchronized void start() throws IOException {
     initAndRecover();
 
-    rpcService.initAsyncedServiceImpl(new 
PipeConsensusRPCServiceProcessor(this, config.getPipe()));
+    rpcService.initSyncedServiceImpl(new 
PipeConsensusRPCServiceProcessor(this, config.getPipe()));
     try {
       registerManager.register(rpcService);
     } catch (StartupException e) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
index 83c80574e32..66bded8d13d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
@@ -29,8 +29,6 @@ import org.apache.iotdb.consensus.config.PipeConsensusConfig;
 import org.apache.iotdb.consensus.pipe.thrift.PipeConsensusIService;
 import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 
-import org.apache.thrift.TBaseAsyncProcessor;
-
 public class PipeConsensusRPCService extends ThriftService implements 
PipeConsensusRPCServiceMBean {
 
   private final TEndPoint thisNode;
@@ -48,15 +46,15 @@ public class PipeConsensusRPCService extends ThriftService 
implements PipeConsen
   }
 
   @Override
-  public void initAsyncedServiceImpl(Object pipeConsensusRPCServiceProcessor) {
+  public void initSyncedServiceImpl(Object pipeConsensusRPCServiceProcessor) {
     this.pipeConsensusRPCServiceProcessor =
         (PipeConsensusRPCServiceProcessor) pipeConsensusRPCServiceProcessor;
-    super.initAsyncedServiceImpl(this.pipeConsensusRPCServiceProcessor);
+    super.initSyncedServiceImpl(this.pipeConsensusRPCServiceProcessor);
   }
 
   @Override
   public void initTProcessor() {
-    processor = new 
PipeConsensusIService.AsyncProcessor<>(pipeConsensusRPCServiceProcessor);
+    processor = new 
PipeConsensusIService.Processor<>(pipeConsensusRPCServiceProcessor);
   }
 
   @Override
@@ -64,20 +62,15 @@ public class PipeConsensusRPCService extends ThriftService 
implements PipeConsen
     try {
       thriftServiceThread =
           new ThriftServiceThread(
-              (TBaseAsyncProcessor<?>) processor,
+              processor,
               getID().getName(),
               ThreadName.PIPE_CONSENSUS_RPC_PROCESSOR.getName(),
               getBindIP(),
               getBindPort(),
-              config.getRpc().getRpcSelectorThreadNum(),
-              config.getRpc().getRpcMinConcurrentClientNum(),
               config.getRpc().getRpcMaxConcurrentClientNum(),
               config.getRpc().getThriftServerAwaitTimeForStopService(),
               new 
PipeConsensusRPCServiceHandler(pipeConsensusRPCServiceProcessor),
               config.getRpc().isRpcThriftCompressionEnabled(),
-              config.getRpc().getConnectionTimeoutInMs(),
-              config.getRpc().getThriftMaxFrameSize(),
-              ThriftServiceThread.ServerType.SELECTOR,
               ZeroCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index e41fffb58e0..84a7cd8dbe3 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -43,11 +43,10 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.AsyncIface {
+public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.Iface {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeConsensusRPCServiceProcessor.class);
   private final PipeConsensus pipeConsensus;
@@ -61,28 +60,19 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
   }
 
   @Override
-  public void pipeConsensusTransfer(
-      TPipeConsensusTransferReq req,
-      AsyncMethodCallback<TPipeConsensusTransferResp> resultHandler) {
-    try {
-      TPipeConsensusTransferResp resp = 
config.getConsensusPipeReceiver().receive(req);
-      // we need to call onComplete by hand
-      resultHandler.onComplete(resp);
-    } catch (Exception e) {
-      resultHandler.onError(e);
-    }
+  public TPipeConsensusTransferResp 
pipeConsensusTransfer(TPipeConsensusTransferReq req) {
+    return config.getConsensusPipeReceiver().receive(req);
   }
 
   // TODO: consider batch transfer
   @Override
-  public void pipeConsensusBatchTransfer(
-      TPipeConsensusBatchTransferReq req,
-      AsyncMethodCallback<TPipeConsensusBatchTransferResp> resultHandler)
-      throws TException {}
+  public TPipeConsensusBatchTransferResp pipeConsensusBatchTransfer(
+      TPipeConsensusBatchTransferReq req) throws TException {
+    return new TPipeConsensusBatchTransferResp();
+  }
 
   @Override
-  public void setActive(TSetActiveReq req, AsyncMethodCallback<TSetActiveResp> 
resultHandler)
-      throws TException {
+  public TSetActiveResp setActive(TSetActiveReq req) throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
     PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -92,18 +82,15 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TSetActiveResp(status));
-      return;
+      return new TSetActiveResp(status);
     }
     impl.setActive(req.isActive);
-    resultHandler.onComplete(new TSetActiveResp(RpcUtils.SUCCESS_STATUS));
+    return new TSetActiveResp(RpcUtils.SUCCESS_STATUS);
   }
 
   @Override
-  public void notifyPeerToCreateConsensusPipe(
-      TNotifyPeerToCreateConsensusPipeReq req,
-      AsyncMethodCallback<TNotifyPeerToCreateConsensusPipeResp> resultHandler)
-      throws TException {
+  public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe(
+      TNotifyPeerToCreateConsensusPipeReq req) throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
     PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -114,8 +101,7 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new 
TNotifyPeerToCreateConsensusPipeResp(status));
-      return;
+      return new TNotifyPeerToCreateConsensusPipeResp(status);
     }
     TSStatus responseStatus;
     try {
@@ -130,14 +116,12 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
       responseStatus.setMessage(e.getMessage());
       LOGGER.warn("Failed to create consensus pipe to target peer with req 
{}", req, e);
     }
-    resultHandler.onComplete(new 
TNotifyPeerToCreateConsensusPipeResp(responseStatus));
+    return new TNotifyPeerToCreateConsensusPipeResp(responseStatus);
   }
 
   @Override
-  public void notifyPeerToDropConsensusPipe(
-      TNotifyPeerToDropConsensusPipeReq req,
-      AsyncMethodCallback<TNotifyPeerToDropConsensusPipeResp> resultHandler)
-      throws TException {
+  public TNotifyPeerToDropConsensusPipeResp notifyPeerToDropConsensusPipe(
+      TNotifyPeerToDropConsensusPipeReq req) throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
     PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -148,8 +132,7 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TNotifyPeerToDropConsensusPipeResp(status));
-      return;
+      return new TNotifyPeerToDropConsensusPipeResp(status);
     }
     TSStatus responseStatus;
     try {
@@ -164,14 +147,12 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
       responseStatus.setMessage(e.getMessage());
       LOGGER.warn("Failed to drop consensus pipe to target peer with req {}", 
req, e);
     }
-    resultHandler.onComplete(new 
TNotifyPeerToDropConsensusPipeResp(responseStatus));
+    return new TNotifyPeerToDropConsensusPipeResp(responseStatus);
   }
 
   @Override
-  public void checkConsensusPipeCompleted(
-      TCheckConsensusPipeCompletedReq req,
-      AsyncMethodCallback<TCheckConsensusPipeCompletedResp> resultHandler)
-      throws TException {
+  public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted(
+      TCheckConsensusPipeCompletedReq req) throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
     PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -183,8 +164,7 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TCheckConsensusPipeCompletedResp(status, 
true));
-      return;
+      return new TCheckConsensusPipeCompletedResp(status, true);
     }
     TSStatus responseStatus;
     boolean isCompleted;
@@ -203,7 +183,7 @@ public class PipeConsensusRPCServiceProcessor implements 
PipeConsensusIService.A
           true,
           e);
     }
-    resultHandler.onComplete(new 
TCheckConsensusPipeCompletedResp(responseStatus, isCompleted));
+    return new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted);
   }
 
   public void handleExit() {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index e160e2dd7de..35727458b55 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -76,16 +76,9 @@ public abstract class ThriftService implements IService {
     JMXService.deregisterMBean(mbeanName);
   }
 
-  boolean setSyncedImpl = false;
-  boolean setAsyncedImpl = false;
+  public void initSyncedServiceImpl(Object serviceImpl) {}
 
-  public void initSyncedServiceImpl(Object serviceImpl) {
-    setSyncedImpl = true;
-  }
-
-  public void initAsyncedServiceImpl(Object serviceImpl) {
-    setAsyncedImpl = true;
-  }
+  public void initAsyncServiceImpl(Object serviceImpl) {}
 
   public abstract void initTProcessor()
       throws ClassNotFoundException,
@@ -114,10 +107,6 @@ public abstract class ThriftService implements IService {
     try {
       reset();
       initTProcessor();
-      if (!setSyncedImpl && !setAsyncedImpl) {
-        throw new StartupException(
-            getID().getName(), "At least one service implementation should be 
set.");
-      }
       initThriftServiceThread();
       thriftServiceThread.setThreadStopLatch(stopLatch);
       thriftServiceThread.start();

Reply via email to