This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4fba35edd93f72575673e889a698a2df5eb31926 Author: 133tosakarin <[email protected]> AuthorDate: Fri Aug 2 09:47:34 2024 +0800 [fix] Change IoTConsensusService and PipeConsensusService from async to sync (#13077) (cherry picked from commit 0a611a8c7c45e3d4f6a27c4f39860dbf4289f0a1) --- .../apache/iotdb/consensus/iot/IoTConsensus.java | 2 +- .../iot/service/IoTConsensusRPCService.java | 14 +- .../service/IoTConsensusRPCServiceProcessor.java | 185 +++++++++------------ .../apache/iotdb/consensus/pipe/PipeConsensus.java | 2 +- .../pipe/service/PipeConsensusRPCService.java | 15 +- .../service/PipeConsensusRPCServiceProcessor.java | 64 +++---- .../iotdb/commons/service/ThriftService.java | 15 +- 7 files changed, 111 insertions(+), 186 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index b94781761e9..464311dd94d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -133,7 +133,7 @@ public class IoTConsensus implements IConsensus { @Override public synchronized void start() throws IOException { initAndRecover(); - service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this)); + service.initSyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this)); try { registerManager.register(service); } catch (StartupException e) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java index 1be3b67b3b1..8f7c484af4b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java @@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig; import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService; import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory; -import org.apache.thrift.TBaseAsyncProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,10 +53,10 @@ public class IoTConsensusRPCService extends ThriftService implements IoTConsensu } @Override - public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) { + public void initSyncedServiceImpl(Object iotConsensusRPCServiceProcessor) { this.iotConsensusRPCServiceProcessor = (IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor; - super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor); + super.initSyncedServiceImpl(iotConsensusRPCServiceProcessor); } @Override @@ -67,7 +66,7 @@ public class IoTConsensusRPCService extends ThriftService implements IoTConsensu InstantiationException, NoSuchMethodException, InvocationTargetException { - processor = new IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor); + processor = new IoTConsensusIService.Processor<>(iotConsensusRPCServiceProcessor); } @Override @@ -76,20 +75,15 @@ public class IoTConsensusRPCService extends ThriftService implements IoTConsensu try { thriftServiceThread = new ThriftServiceThread( - (TBaseAsyncProcessor<?>) processor, + processor, getID().getName(), ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(), getBindIP(), getBindPort(), - config.getRpc().getRpcSelectorThreadNum(), - config.getRpc().getRpcMinConcurrentClientNum(), config.getRpc().getRpcMaxConcurrentClientNum(), config.getRpc().getThriftServerAwaitTimeForStopService(), new IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor), config.getRpc().isRpcThriftCompressionEnabled(), - config.getRpc().getConnectionTimeoutInMs(), - config.getRpc().getThriftMaxFrameSize(), - ThriftServiceThread.ServerType.SELECTOR, ZeroCopyRpcTransportFactory.INSTANCE); } catch (RPCServiceException e) { throw new IllegalAccessException(e.getMessage()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index e29d48fed7c..a609acd2651 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -55,14 +55,13 @@ import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.stream.Collectors; -public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.AsyncIface { +public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class); @@ -74,98 +73,85 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy } @Override - public void syncLogEntries( - TSyncLogEntriesReq req, AsyncMethodCallback<TSyncLogEntriesRes> resultHandler) { - try { - ConsensusGroupId groupId = - ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); - IoTConsensusServerImpl impl = consensus.getImpl(groupId); - if (impl == null) { - String message = - String.format( - "unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s", - groupId, req.getLogEntries().size()); - LOGGER.error(message); - TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); - status.setMessage(message); - resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status))); - return; - } - if (impl.isReadOnly()) { - String message = "fail to sync logEntries because system is read-only."; - LOGGER.error(message); - TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); - status.setMessage(message); - resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status))); - return; - } - if (!impl.isActive()) { - TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()); - status.setMessage("peer is inactive and not ready to receive sync log request"); - resultHandler.onComplete(new TSyncLogEntriesRes(Collections.singletonList(status))); - return; - } - BatchIndexedConsensusRequest logEntriesInThisBatch = - new BatchIndexedConsensusRequest(req.peerId); - // We use synchronized to ensure atomicity of executing multiple logs - for (TLogEntry entry : req.getLogEntries()) { - logEntriesInThisBatch.add( - impl.buildIndexedConsensusRequestForRemoteRequest( - entry.getSearchIndex(), - entry.getData().stream() - .map( - entry.isFromWAL() - ? IoTConsensusRequest::new - : ByteBufferConsensusRequest::new) - .collect(Collectors.toList()))); - } - long buildRequestTime = System.nanoTime(); - IConsensusRequest deserializedRequest = - impl.getStateMachine().deserializeRequest(logEntriesInThisBatch); - impl.getIoTConsensusServerMetrics() - .recordDeserializeCost(System.nanoTime() - buildRequestTime); - TSStatus writeStatus = - impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest); - LOGGER.debug( - "execute TSyncLogEntriesReq for {} with result {}", - req.consensusGroupId, - writeStatus.subStatus); - resultHandler.onComplete(new TSyncLogEntriesRes(writeStatus.subStatus)); - } catch (Exception e) { - resultHandler.onError(e); + public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) { + ConsensusGroupId groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); + IoTConsensusServerImpl impl = consensus.getImpl(groupId); + if (impl == null) { + String message = + String.format( + "unexpected consensusGroupId %s for TSyncLogEntriesReq which size is %s", + groupId, req.getLogEntries().size()); + LOGGER.error(message); + TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage(message); + return new TSyncLogEntriesRes(Collections.singletonList(status)); } + if (impl.isReadOnly()) { + String message = "fail to sync logEntries because system is read-only."; + LOGGER.error(message); + TSStatus status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()); + status.setMessage(message); + return new TSyncLogEntriesRes(Collections.singletonList(status)); + } + if (!impl.isActive()) { + TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()); + status.setMessage("peer is inactive and not ready to receive sync log request"); + return new TSyncLogEntriesRes(Collections.singletonList(status)); + } + BatchIndexedConsensusRequest logEntriesInThisBatch = + new BatchIndexedConsensusRequest(req.peerId); + // We use synchronized to ensure atomicity of executing multiple logs + for (TLogEntry entry : req.getLogEntries()) { + logEntriesInThisBatch.add( + impl.buildIndexedConsensusRequestForRemoteRequest( + entry.getSearchIndex(), + entry.getData().stream() + .map( + entry.isFromWAL() + ? IoTConsensusRequest::new + : ByteBufferConsensusRequest::new) + .collect(Collectors.toList()))); + } + long buildRequestTime = System.nanoTime(); + IConsensusRequest deserializedRequest = + impl.getStateMachine().deserializeRequest(logEntriesInThisBatch); + impl.getIoTConsensusServerMetrics().recordDeserializeCost(System.nanoTime() - buildRequestTime); + TSStatus writeStatus = + impl.syncLog(logEntriesInThisBatch.getSourcePeerId(), deserializedRequest); + LOGGER.debug( + "execute TSyncLogEntriesReq for {} with result {}", + req.consensusGroupId, + writeStatus.subStatus); + return new TSyncLogEntriesRes(writeStatus.subStatus); } @Override - public void inactivatePeer( - TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes> resultHandler) - throws TException { + public TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) throws TException { if (req.isForDeletionPurpose()) { KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE); } ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); IoTConsensusServerImpl impl = consensus.getImpl(groupId); + if (impl == null) { String message = String.format("unexpected consensusGroupId %s for inactivatePeer request", groupId); LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TInactivatePeerRes(status)); - return; + return new TInactivatePeerRes(status); } impl.setActive(false); - resultHandler.onComplete( - new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); if (req.isForDeletionPurpose()) { KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE); } + return new TInactivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } @Override - public void activatePeer( - TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes> resultHandler) throws TException { + public TActivatePeerRes activatePeer(TActivatePeerReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); IoTConsensusServerImpl impl = consensus.getImpl(groupId); @@ -175,18 +161,15 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TActivatePeerRes(status)); - return; + return new TActivatePeerRes(status); } KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE); impl.setActive(true); - resultHandler.onComplete( - new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); + return new TActivatePeerRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } @Override - public void buildSyncLogChannel( - TBuildSyncLogChannelReq req, AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler) + public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -197,8 +180,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TBuildSyncLogChannelRes(status)); - return; + return new TBuildSyncLogChannelRes(status); } TSStatus responseStatus; try { @@ -208,12 +190,11 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus)); + return new TBuildSyncLogChannelRes(responseStatus); } @Override - public void removeSyncLogChannel( - TRemoveSyncLogChannelReq req, AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler) + public TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -224,8 +205,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TRemoveSyncLogChannelRes(status)); - return; + return new TRemoveSyncLogChannelRes(status); } TSStatus responseStatus; try { @@ -235,12 +215,11 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus)); + return new TRemoveSyncLogChannelRes(responseStatus); } @Override - public void waitSyncLogComplete( - TWaitSyncLogCompleteReq req, AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler) + public TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -251,18 +230,15 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0)); - return; + return new TWaitSyncLogCompleteRes(true, 0, 0); } long searchIndex = impl.getSearchIndex(); long safeIndex = impl.getMinSyncIndex(); - resultHandler.onComplete( - new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex)); + return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, safeIndex); } @Override - public void sendSnapshotFragment( - TSendSnapshotFragmentReq req, AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler) + public TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -273,8 +249,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TSendSnapshotFragmentRes(status)); - return; + return new TSendSnapshotFragmentRes(status); } TSStatus responseStatus; try { @@ -284,12 +259,11 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus)); + return new TSendSnapshotFragmentRes(responseStatus); } @Override - public void triggerSnapshotLoad( - TTriggerSnapshotLoadReq req, AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler) + public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -300,20 +274,16 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TTriggerSnapshotLoadRes(status)); - return; + return new TTriggerSnapshotLoadRes(status); } impl.loadSnapshot(req.snapshotId); KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION); - resultHandler.onComplete( - new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); + return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } @Override - public void cleanupTransferredSnapshot( - TCleanupTransferredSnapshotReq req, - AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler) - throws TException { + public TCleanupTransferredSnapshotRes cleanupTransferredSnapshot( + TCleanupTransferredSnapshotReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); IoTConsensusServerImpl impl = consensus.getImpl(groupId); @@ -323,8 +293,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status)); - return; + return new TCleanupTransferredSnapshotRes(status); } TSStatus responseStatus; try { @@ -335,7 +304,7 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Asy responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); responseStatus.setMessage(e.getMessage()); } - resultHandler.onComplete(new TCleanupTransferredSnapshotRes(responseStatus)); + return new TCleanupTransferredSnapshotRes(responseStatus); } public void handleClientExit() {} diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index a6d87669269..7cd0470f977 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -116,7 +116,7 @@ public class PipeConsensus implements IConsensus { public synchronized void start() throws IOException { initAndRecover(); - rpcService.initAsyncedServiceImpl(new PipeConsensusRPCServiceProcessor(this, config.getPipe())); + rpcService.initSyncedServiceImpl(new PipeConsensusRPCServiceProcessor(this, config.getPipe())); try { registerManager.register(rpcService); } catch (StartupException e) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java index 83c80574e32..66bded8d13d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java @@ -29,8 +29,6 @@ import org.apache.iotdb.consensus.config.PipeConsensusConfig; import org.apache.iotdb.consensus.pipe.thrift.PipeConsensusIService; import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory; -import org.apache.thrift.TBaseAsyncProcessor; - public class PipeConsensusRPCService extends ThriftService implements PipeConsensusRPCServiceMBean { private final TEndPoint thisNode; @@ -48,15 +46,15 @@ public class PipeConsensusRPCService extends ThriftService implements PipeConsen } @Override - public void initAsyncedServiceImpl(Object pipeConsensusRPCServiceProcessor) { + public void initSyncedServiceImpl(Object pipeConsensusRPCServiceProcessor) { this.pipeConsensusRPCServiceProcessor = (PipeConsensusRPCServiceProcessor) pipeConsensusRPCServiceProcessor; - super.initAsyncedServiceImpl(this.pipeConsensusRPCServiceProcessor); + super.initSyncedServiceImpl(this.pipeConsensusRPCServiceProcessor); } @Override public void initTProcessor() { - processor = new PipeConsensusIService.AsyncProcessor<>(pipeConsensusRPCServiceProcessor); + processor = new PipeConsensusIService.Processor<>(pipeConsensusRPCServiceProcessor); } @Override @@ -64,20 +62,15 @@ public class PipeConsensusRPCService extends ThriftService implements PipeConsen try { thriftServiceThread = new ThriftServiceThread( - (TBaseAsyncProcessor<?>) processor, + processor, getID().getName(), ThreadName.PIPE_CONSENSUS_RPC_PROCESSOR.getName(), getBindIP(), getBindPort(), - config.getRpc().getRpcSelectorThreadNum(), - config.getRpc().getRpcMinConcurrentClientNum(), config.getRpc().getRpcMaxConcurrentClientNum(), config.getRpc().getThriftServerAwaitTimeForStopService(), new PipeConsensusRPCServiceHandler(pipeConsensusRPCServiceProcessor), config.getRpc().isRpcThriftCompressionEnabled(), - config.getRpc().getConnectionTimeoutInMs(), - config.getRpc().getThriftMaxFrameSize(), - ThriftServiceThread.ServerType.SELECTOR, ZeroCopyRpcTransportFactory.INSTANCE); } catch (RPCServiceException e) { throw new IllegalAccessException(e.getMessage()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java index e41fffb58e0..84a7cd8dbe3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java @@ -43,11 +43,10 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.AsyncIface { +public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusRPCServiceProcessor.class); private final PipeConsensus pipeConsensus; @@ -61,28 +60,19 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A } @Override - public void pipeConsensusTransfer( - TPipeConsensusTransferReq req, - AsyncMethodCallback<TPipeConsensusTransferResp> resultHandler) { - try { - TPipeConsensusTransferResp resp = config.getConsensusPipeReceiver().receive(req); - // we need to call onComplete by hand - resultHandler.onComplete(resp); - } catch (Exception e) { - resultHandler.onError(e); - } + public TPipeConsensusTransferResp pipeConsensusTransfer(TPipeConsensusTransferReq req) { + return config.getConsensusPipeReceiver().receive(req); } // TODO: consider batch transfer @Override - public void pipeConsensusBatchTransfer( - TPipeConsensusBatchTransferReq req, - AsyncMethodCallback<TPipeConsensusBatchTransferResp> resultHandler) - throws TException {} + public TPipeConsensusBatchTransferResp pipeConsensusBatchTransfer( + TPipeConsensusBatchTransferReq req) throws TException { + return new TPipeConsensusBatchTransferResp(); + } @Override - public void setActive(TSetActiveReq req, AsyncMethodCallback<TSetActiveResp> resultHandler) - throws TException { + public TSetActiveResp setActive(TSetActiveReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId); PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId); @@ -92,18 +82,15 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TSetActiveResp(status)); - return; + return new TSetActiveResp(status); } impl.setActive(req.isActive); - resultHandler.onComplete(new TSetActiveResp(RpcUtils.SUCCESS_STATUS)); + return new TSetActiveResp(RpcUtils.SUCCESS_STATUS); } @Override - public void notifyPeerToCreateConsensusPipe( - TNotifyPeerToCreateConsensusPipeReq req, - AsyncMethodCallback<TNotifyPeerToCreateConsensusPipeResp> resultHandler) - throws TException { + public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe( + TNotifyPeerToCreateConsensusPipeReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId); PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId); @@ -114,8 +101,7 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TNotifyPeerToCreateConsensusPipeResp(status)); - return; + return new TNotifyPeerToCreateConsensusPipeResp(status); } TSStatus responseStatus; try { @@ -130,14 +116,12 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A responseStatus.setMessage(e.getMessage()); LOGGER.warn("Failed to create consensus pipe to target peer with req {}", req, e); } - resultHandler.onComplete(new TNotifyPeerToCreateConsensusPipeResp(responseStatus)); + return new TNotifyPeerToCreateConsensusPipeResp(responseStatus); } @Override - public void notifyPeerToDropConsensusPipe( - TNotifyPeerToDropConsensusPipeReq req, - AsyncMethodCallback<TNotifyPeerToDropConsensusPipeResp> resultHandler) - throws TException { + public TNotifyPeerToDropConsensusPipeResp notifyPeerToDropConsensusPipe( + TNotifyPeerToDropConsensusPipeReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId); PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId); @@ -148,8 +132,7 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TNotifyPeerToDropConsensusPipeResp(status)); - return; + return new TNotifyPeerToDropConsensusPipeResp(status); } TSStatus responseStatus; try { @@ -164,14 +147,12 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A responseStatus.setMessage(e.getMessage()); LOGGER.warn("Failed to drop consensus pipe to target peer with req {}", req, e); } - resultHandler.onComplete(new TNotifyPeerToDropConsensusPipeResp(responseStatus)); + return new TNotifyPeerToDropConsensusPipeResp(responseStatus); } @Override - public void checkConsensusPipeCompleted( - TCheckConsensusPipeCompletedReq req, - AsyncMethodCallback<TCheckConsensusPipeCompletedResp> resultHandler) - throws TException { + public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted( + TCheckConsensusPipeCompletedReq req) throws TException { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId); PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId); @@ -183,8 +164,7 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A LOGGER.error(message); TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); status.setMessage(message); - resultHandler.onComplete(new TCheckConsensusPipeCompletedResp(status, true)); - return; + return new TCheckConsensusPipeCompletedResp(status, true); } TSStatus responseStatus; boolean isCompleted; @@ -203,7 +183,7 @@ public class PipeConsensusRPCServiceProcessor implements PipeConsensusIService.A true, e); } - resultHandler.onComplete(new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted)); + return new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted); } public void handleExit() {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java index e160e2dd7de..35727458b55 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java @@ -76,16 +76,9 @@ public abstract class ThriftService implements IService { JMXService.deregisterMBean(mbeanName); } - boolean setSyncedImpl = false; - boolean setAsyncedImpl = false; + public void initSyncedServiceImpl(Object serviceImpl) {} - public void initSyncedServiceImpl(Object serviceImpl) { - setSyncedImpl = true; - } - - public void initAsyncedServiceImpl(Object serviceImpl) { - setAsyncedImpl = true; - } + public void initAsyncServiceImpl(Object serviceImpl) {} public abstract void initTProcessor() throws ClassNotFoundException, @@ -114,10 +107,6 @@ public abstract class ThriftService implements IService { try { reset(); initTProcessor(); - if (!setSyncedImpl && !setAsyncedImpl) { - throw new StartupException( - getID().getName(), "At least one service implementation should be set."); - } initThriftServiceThread(); thriftServiceThread.setThreadStopLatch(stopLatch); thriftServiceThread.start();
