This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rc/1.3.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.2 by this push:
     new e24ebd36ed4 [fix] Change IoTConsensusService and PipeConsensusService 
from async  to sync  (#13077)
e24ebd36ed4 is described below

commit e24ebd36ed4ca8075be3f55fa7fed5059b7ba9ac
Author: 133tosakarin <[email protected]>
AuthorDate: Fri Aug 2 09:47:34 2024 +0800

    [fix] Change IoTConsensusService and PipeConsensusService from async  to 
sync  (#13077)
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |   2 +-
 .../iot/service/IoTConsensusRPCService.java        |  20 +--
 .../service/IoTConsensusRPCServiceProcessor.java   | 185 +++++++++------------
 .../iotdb/commons/service/ThriftService.java       |  15 +-
 4 files changed, 85 insertions(+), 137 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 46d911608f9..b380a100ccf 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -133,7 +133,7 @@ public class IoTConsensus implements IConsensus {
   @Override
   public synchronized void start() throws IOException {
     initAndRecover();
-    service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
+    service.initSyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
     try {
       registerManager.register(service);
     } catch (StartupException e) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
index 7be6dbefc99..b41281043bc 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
@@ -29,12 +29,9 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
 import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
 
-import org.apache.thrift.TBaseAsyncProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.InvocationTargetException;
-
 public class IoTConsensusRPCService extends ThriftService implements 
IoTConsensusRPCServiceMBean {
 
   private static final Logger logger = 
LoggerFactory.getLogger(IoTConsensusRPCService.class);
@@ -54,17 +51,15 @@ public class IoTConsensusRPCService extends ThriftService 
implements IoTConsensu
   }
 
   @Override
-  public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
+  public void initSyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
     this.iotConsensusRPCServiceProcessor =
         (IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor;
-    super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor);
+    super.initSyncedServiceImpl(iotConsensusRPCServiceProcessor);
   }
 
   @Override
-  public void initTProcessor()
-      throws ClassNotFoundException, IllegalAccessException, 
InstantiationException,
-          NoSuchMethodException, InvocationTargetException {
-    processor = new 
IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor);
+  public void initTProcessor() {
+    processor = new 
IoTConsensusIService.Processor<>(iotConsensusRPCServiceProcessor);
   }
 
   @Override
@@ -73,20 +68,15 @@ public class IoTConsensusRPCService extends ThriftService 
implements IoTConsensu
     try {
       thriftServiceThread =
           new ThriftServiceThread(
-              (TBaseAsyncProcessor<?>) processor,
+              processor,
               getID().getName(),
               ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(),
               getBindIP(),
               getBindPort(),
-              config.getRpc().getRpcSelectorThreadNum(),
-              config.getRpc().getRpcMinConcurrentClientNum(),
               config.getRpc().getRpcMaxConcurrentClientNum(),
               config.getRpc().getThriftServerAwaitTimeForStopService(),
               new 
IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor),
               config.getRpc().isRpcThriftCompressionEnabled(),
-              config.getRpc().getConnectionTimeoutInMs(),
-              config.getRpc().getThriftMaxFrameSize(),
-              ThriftServiceThread.ServerType.SELECTOR,
               ZeroCopyRpcTransportFactory.INSTANCE);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 1c5f5354c82..185907334d6 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -55,14 +55,13 @@ import 
org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.stream.Collectors;
 
-public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.AsyncIface {
+public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Iface {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class);
@@ -74,98 +73,85 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
   }
 
   @Override
-  public void syncLogEntries(
-      TSyncLogEntriesReq req, AsyncMethodCallback<TSyncLogEntriesRes> 
resultHandler) {
-    try {
-      ConsensusGroupId groupId =
-          
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-      IoTConsensusServerImpl impl = consensus.getImpl(groupId);
-      if (impl == null) {
-        String message =
-            String.format(
-                "unexpected consensusGroupId %s for TSyncLogEntriesReq which 
size is %s",
-                groupId, req.getLogEntries().size());
-        LOGGER.error(message);
-        TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-        status.setMessage(message);
-        resultHandler.onComplete(new 
TSyncLogEntriesRes(Collections.singletonList(status)));
-        return;
-      }
-      if (impl.isReadOnly()) {
-        String message = "fail to sync logEntries because system is 
read-only.";
-        LOGGER.error(message);
-        TSStatus status = new 
TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
-        status.setMessage(message);
-        resultHandler.onComplete(new 
TSyncLogEntriesRes(Collections.singletonList(status)));
-        return;
-      }
-      if (!impl.isActive()) {
-        TSStatus status = new 
TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
-        status.setMessage("peer is inactive and not ready to receive sync log 
request");
-        resultHandler.onComplete(new 
TSyncLogEntriesRes(Collections.singletonList(status)));
-        return;
-      }
-      BatchIndexedConsensusRequest logEntriesInThisBatch =
-          new BatchIndexedConsensusRequest(req.peerId);
-      // We use synchronized to ensure atomicity of executing multiple logs
-      for (TLogEntry entry : req.getLogEntries()) {
-        logEntriesInThisBatch.add(
-            impl.buildIndexedConsensusRequestForRemoteRequest(
-                entry.getSearchIndex(),
-                entry.getData().stream()
-                    .map(
-                        entry.isFromWAL()
-                            ? IoTConsensusRequest::new
-                            : ByteBufferConsensusRequest::new)
-                    .collect(Collectors.toList())));
-      }
-      long buildRequestTime = System.nanoTime();
-      IConsensusRequest deserializedRequest =
-          impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
-      impl.getIoTConsensusServerMetrics()
-          .recordDeserializeCost(System.nanoTime() - buildRequestTime);
-      TSStatus writeStatus =
-          impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), 
deserializedRequest);
-      LOGGER.debug(
-          "execute TSyncLogEntriesReq for {} with result {}",
-          req.consensusGroupId,
-          writeStatus.subStatus);
-      resultHandler.onComplete(new TSyncLogEntriesRes(writeStatus.subStatus));
-    } catch (Exception e) {
-      resultHandler.onError(e);
+  public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for TSyncLogEntriesReq which 
size is %s",
+              groupId, req.getLogEntries().size());
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TSyncLogEntriesRes(Collections.singletonList(status));
     }
+    if (impl.isReadOnly()) {
+      String message = "fail to sync logEntries because system is read-only.";
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
+      status.setMessage(message);
+      return new TSyncLogEntriesRes(Collections.singletonList(status));
+    }
+    if (!impl.isActive()) {
+      TSStatus status = new 
TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
+      status.setMessage("peer is inactive and not ready to receive sync log 
request");
+      return new TSyncLogEntriesRes(Collections.singletonList(status));
+    }
+    BatchIndexedConsensusRequest logEntriesInThisBatch =
+        new BatchIndexedConsensusRequest(req.peerId);
+    // We use synchronized to ensure atomicity of executing multiple logs
+    for (TLogEntry entry : req.getLogEntries()) {
+      logEntriesInThisBatch.add(
+          impl.buildIndexedConsensusRequestForRemoteRequest(
+              entry.getSearchIndex(),
+              entry.getData().stream()
+                  .map(
+                      entry.isFromWAL()
+                          ? IoTConsensusRequest::new
+                          : ByteBufferConsensusRequest::new)
+                  .collect(Collectors.toList())));
+    }
+    long buildRequestTime = System.nanoTime();
+    IConsensusRequest deserializedRequest =
+        impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
+    
impl.getIoTConsensusServerMetrics().recordDeserializeCost(System.nanoTime() - 
buildRequestTime);
+    TSStatus writeStatus =
+        impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), 
deserializedRequest);
+    LOGGER.debug(
+        "execute TSyncLogEntriesReq for {} with result {}",
+        req.consensusGroupId,
+        writeStatus.subStatus);
+    return new TSyncLogEntriesRes(writeStatus.subStatus);
   }
 
   @Override
-  public void inactivatePeer(
-      TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes> 
resultHandler)
-      throws TException {
+  public TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) throws 
TException {
     if (req.isForDeletionPurpose()) {
       
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
     }
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+
     if (impl == null) {
       String message =
           String.format("unexpected consensusGroupId %s for inactivatePeer 
request", groupId);
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TInactivatePeerRes(status));
-      return;
+      return new TInactivatePeerRes(status);
     }
     impl.setActive(false);
-    resultHandler.onComplete(
-        new TInactivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
     if (req.isForDeletionPurpose()) {
       
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
     }
+    return new TInactivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override
-  public void activatePeer(
-      TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> 
resultHandler) throws TException {
+  public TActivatePeerRes activatePeer(TActivatePeerReq req) throws TException 
{
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -175,18 +161,15 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TActivatePeerRes(status));
-      return;
+      return new TActivatePeerRes(status);
     }
     KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
     impl.setActive(true);
-    resultHandler.onComplete(
-        new TActivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+    return new TActivatePeerRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override
-  public void buildSyncLogChannel(
-      TBuildSyncLogChannelReq req, 
AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
+  public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq 
req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -197,8 +180,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TBuildSyncLogChannelRes(status));
-      return;
+      return new TBuildSyncLogChannelRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -208,12 +190,11 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
+    return new TBuildSyncLogChannelRes(responseStatus);
   }
 
   @Override
-  public void removeSyncLogChannel(
-      TRemoveSyncLogChannelReq req, 
AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler)
+  public TRemoveSyncLogChannelRes 
removeSyncLogChannel(TRemoveSyncLogChannelReq req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -224,8 +205,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TRemoveSyncLogChannelRes(status));
-      return;
+      return new TRemoveSyncLogChannelRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -235,12 +215,11 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
+    return new TRemoveSyncLogChannelRes(responseStatus);
   }
 
   @Override
-  public void waitSyncLogComplete(
-      TWaitSyncLogCompleteReq req, 
AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler)
+  public TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq 
req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -251,18 +230,15 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
-      return;
+      return new TWaitSyncLogCompleteRes(true, 0, 0);
     }
     long searchIndex = impl.getSearchIndex();
     long safeIndex = impl.getMinSyncIndex();
-    resultHandler.onComplete(
-        new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, 
safeIndex));
+    return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, 
safeIndex);
   }
 
   @Override
-  public void sendSnapshotFragment(
-      TSendSnapshotFragmentReq req, 
AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
+  public TSendSnapshotFragmentRes 
sendSnapshotFragment(TSendSnapshotFragmentReq req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -273,8 +249,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TSendSnapshotFragmentRes(status));
-      return;
+      return new TSendSnapshotFragmentRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -284,12 +259,11 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus));
+    return new TSendSnapshotFragmentRes(responseStatus);
   }
 
   @Override
-  public void triggerSnapshotLoad(
-      TTriggerSnapshotLoadReq req, 
AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler)
+  public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq 
req)
       throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -300,20 +274,16 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TTriggerSnapshotLoadRes(status));
-      return;
+      return new TTriggerSnapshotLoadRes(status);
     }
     impl.loadSnapshot(req.snapshotId);
     KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
-    resultHandler.onComplete(
-        new TTriggerSnapshotLoadRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+    return new TTriggerSnapshotLoadRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
   @Override
-  public void cleanupTransferredSnapshot(
-      TCleanupTransferredSnapshotReq req,
-      AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler)
-      throws TException {
+  public TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(
+      TCleanupTransferredSnapshotReq req) throws TException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
     IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -323,8 +293,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       LOGGER.error(message);
       TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       status.setMessage(message);
-      resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status));
-      return;
+      return new TCleanupTransferredSnapshotRes(status);
     }
     TSStatus responseStatus;
     try {
@@ -335,7 +304,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Asy
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
       responseStatus.setMessage(e.getMessage());
     }
-    resultHandler.onComplete(new 
TCleanupTransferredSnapshotRes(responseStatus));
+    return new TCleanupTransferredSnapshotRes(responseStatus);
   }
 
   public void handleClientExit() {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index 98341903d2c..f214de293ef 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -76,16 +76,9 @@ public abstract class ThriftService implements IService {
     JMXService.deregisterMBean(mbeanName);
   }
 
-  boolean setSyncedImpl = false;
-  boolean setAsyncedImpl = false;
+  public void initSyncedServiceImpl(Object serviceImpl) {}
 
-  public void initSyncedServiceImpl(Object serviceImpl) {
-    setSyncedImpl = true;
-  }
-
-  public void initAsyncedServiceImpl(Object serviceImpl) {
-    setAsyncedImpl = true;
-  }
+  public void initAsyncServiceImpl(Object serviceImpl) {}
 
   public abstract void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, 
InstantiationException,
@@ -111,10 +104,6 @@ public abstract class ThriftService implements IService {
     try {
       reset();
       initTProcessor();
-      if (!setSyncedImpl && !setAsyncedImpl) {
-        throw new StartupException(
-            getID().getName(), "At least one service implementation should be 
set.");
-      }
       initThriftServiceThread();
       thriftServiceThread.setThreadStopLatch(stopLatch);
       thriftServiceThread.start();

Reply via email to