This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0a611a8c7c4 [fix] Change IoTConsensusService and PipeConsensusService
from async to sync (#13077)
0a611a8c7c4 is described below
commit 0a611a8c7c45e3d4f6a27c4f39860dbf4289f0a1
Author: 133tosakarin <[email protected]>
AuthorDate: Fri Aug 2 09:47:34 2024 +0800
[fix] Change IoTConsensusService and PipeConsensusService from async to
sync (#13077)
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 2 +-
.../iot/service/IoTConsensusRPCService.java | 14 +-
.../service/IoTConsensusRPCServiceProcessor.java | 185 +++++++++------------
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 2 +-
.../pipe/service/PipeConsensusRPCService.java | 15 +-
.../service/PipeConsensusRPCServiceProcessor.java | 64 +++----
.../iotdb/commons/service/ThriftService.java | 15 +-
7 files changed, 111 insertions(+), 186 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index b94781761e9..464311dd94d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -133,7 +133,7 @@ public class IoTConsensus implements IConsensus {
@Override
public synchronized void start() throws IOException {
initAndRecover();
- service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
+ service.initSyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
try {
registerManager.register(service);
} catch (StartupException e) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
index 1be3b67b3b1..8f7c484af4b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCService.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
-import org.apache.thrift.TBaseAsyncProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,10 +53,10 @@ public class IoTConsensusRPCService extends ThriftService
implements IoTConsensu
}
@Override
- public void initAsyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
+ public void initSyncedServiceImpl(Object iotConsensusRPCServiceProcessor) {
this.iotConsensusRPCServiceProcessor =
(IoTConsensusRPCServiceProcessor) iotConsensusRPCServiceProcessor;
- super.initAsyncedServiceImpl(this.iotConsensusRPCServiceProcessor);
+ super.initSyncedServiceImpl(iotConsensusRPCServiceProcessor);
}
@Override
@@ -67,7 +66,7 @@ public class IoTConsensusRPCService extends ThriftService
implements IoTConsensu
InstantiationException,
NoSuchMethodException,
InvocationTargetException {
- processor = new
IoTConsensusIService.AsyncProcessor<>(iotConsensusRPCServiceProcessor);
+ processor = new
IoTConsensusIService.Processor<>(iotConsensusRPCServiceProcessor);
}
@Override
@@ -76,20 +75,15 @@ public class IoTConsensusRPCService extends ThriftService
implements IoTConsensu
try {
thriftServiceThread =
new ThriftServiceThread(
- (TBaseAsyncProcessor<?>) processor,
+ processor,
getID().getName(),
ThreadName.IOT_CONSENSUS_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
- config.getRpc().getRpcSelectorThreadNum(),
- config.getRpc().getRpcMinConcurrentClientNum(),
config.getRpc().getRpcMaxConcurrentClientNum(),
config.getRpc().getThriftServerAwaitTimeForStopService(),
new
IoTConsensusRPCServiceHandler(iotConsensusRPCServiceProcessor),
config.getRpc().isRpcThriftCompressionEnabled(),
- config.getRpc().getConnectionTimeoutInMs(),
- config.getRpc().getThriftMaxFrameSize(),
- ThriftServiceThread.ServerType.SELECTOR,
ZeroCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index e29d48fed7c..a609acd2651 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -55,14 +55,13 @@ import
org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.stream.Collectors;
-public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.AsyncIface {
+public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Iface {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTConsensusRPCServiceProcessor.class);
@@ -74,98 +73,85 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
}
@Override
- public void syncLogEntries(
- TSyncLogEntriesReq req, AsyncMethodCallback<TSyncLogEntriesRes>
resultHandler) {
- try {
- ConsensusGroupId groupId =
-
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- IoTConsensusServerImpl impl = consensus.getImpl(groupId);
- if (impl == null) {
- String message =
- String.format(
- "unexpected consensusGroupId %s for TSyncLogEntriesReq which
size is %s",
- groupId, req.getLogEntries().size());
- LOGGER.error(message);
- TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- status.setMessage(message);
- resultHandler.onComplete(new
TSyncLogEntriesRes(Collections.singletonList(status)));
- return;
- }
- if (impl.isReadOnly()) {
- String message = "fail to sync logEntries because system is
read-only.";
- LOGGER.error(message);
- TSStatus status = new
TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
- status.setMessage(message);
- resultHandler.onComplete(new
TSyncLogEntriesRes(Collections.singletonList(status)));
- return;
- }
- if (!impl.isActive()) {
- TSStatus status = new
TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
- status.setMessage("peer is inactive and not ready to receive sync log
request");
- resultHandler.onComplete(new
TSyncLogEntriesRes(Collections.singletonList(status)));
- return;
- }
- BatchIndexedConsensusRequest logEntriesInThisBatch =
- new BatchIndexedConsensusRequest(req.peerId);
- // We use synchronized to ensure atomicity of executing multiple logs
- for (TLogEntry entry : req.getLogEntries()) {
- logEntriesInThisBatch.add(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- entry.getSearchIndex(),
- entry.getData().stream()
- .map(
- entry.isFromWAL()
- ? IoTConsensusRequest::new
- : ByteBufferConsensusRequest::new)
- .collect(Collectors.toList())));
- }
- long buildRequestTime = System.nanoTime();
- IConsensusRequest deserializedRequest =
- impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
- impl.getIoTConsensusServerMetrics()
- .recordDeserializeCost(System.nanoTime() - buildRequestTime);
- TSStatus writeStatus =
- impl.syncLog(logEntriesInThisBatch.getSourcePeerId(),
deserializedRequest);
- LOGGER.debug(
- "execute TSyncLogEntriesReq for {} with result {}",
- req.consensusGroupId,
- writeStatus.subStatus);
- resultHandler.onComplete(new TSyncLogEntriesRes(writeStatus.subStatus));
- } catch (Exception e) {
- resultHandler.onError(e);
+ public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "unexpected consensusGroupId %s for TSyncLogEntriesReq which
size is %s",
+ groupId, req.getLogEntries().size());
+ LOGGER.error(message);
+ TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ return new TSyncLogEntriesRes(Collections.singletonList(status));
}
+ if (impl.isReadOnly()) {
+ String message = "fail to sync logEntries because system is read-only.";
+ LOGGER.error(message);
+ TSStatus status = new
TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
+ status.setMessage(message);
+ return new TSyncLogEntriesRes(Collections.singletonList(status));
+ }
+ if (!impl.isActive()) {
+ TSStatus status = new
TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
+ status.setMessage("peer is inactive and not ready to receive sync log
request");
+ return new TSyncLogEntriesRes(Collections.singletonList(status));
+ }
+ BatchIndexedConsensusRequest logEntriesInThisBatch =
+ new BatchIndexedConsensusRequest(req.peerId);
+ // We use synchronized to ensure atomicity of executing multiple logs
+ for (TLogEntry entry : req.getLogEntries()) {
+ logEntriesInThisBatch.add(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ entry.getSearchIndex(),
+ entry.getData().stream()
+ .map(
+ entry.isFromWAL()
+ ? IoTConsensusRequest::new
+ : ByteBufferConsensusRequest::new)
+ .collect(Collectors.toList())));
+ }
+ long buildRequestTime = System.nanoTime();
+ IConsensusRequest deserializedRequest =
+ impl.getStateMachine().deserializeRequest(logEntriesInThisBatch);
+
impl.getIoTConsensusServerMetrics().recordDeserializeCost(System.nanoTime() -
buildRequestTime);
+ TSStatus writeStatus =
+ impl.syncLog(logEntriesInThisBatch.getSourcePeerId(),
deserializedRequest);
+ LOGGER.debug(
+ "execute TSyncLogEntriesReq for {} with result {}",
+ req.consensusGroupId,
+ writeStatus.subStatus);
+ return new TSyncLogEntriesRes(writeStatus.subStatus);
}
@Override
- public void inactivatePeer(
- TInactivatePeerReq req, AsyncMethodCallback<TInactivatePeerRes>
resultHandler)
- throws TException {
+ public TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) throws
TException {
if (req.isForDeletionPurpose()) {
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
}
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+
if (impl == null) {
String message =
String.format("unexpected consensusGroupId %s for inactivatePeer
request", groupId);
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TInactivatePeerRes(status));
- return;
+ return new TInactivatePeerRes(status);
}
impl.setActive(false);
- resultHandler.onComplete(
- new TInactivatePeerRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
if (req.isForDeletionPurpose()) {
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
}
+ return new TInactivatePeerRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@Override
- public void activatePeer(
- TActivatePeerReq req, AsyncMethodCallback<TActivatePeerRes>
resultHandler) throws TException {
+ public TActivatePeerRes activatePeer(TActivatePeerReq req) throws TException
{
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -175,18 +161,15 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TActivatePeerRes(status));
- return;
+ return new TActivatePeerRes(status);
}
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
impl.setActive(true);
- resultHandler.onComplete(
- new TActivatePeerRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ return new TActivatePeerRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@Override
- public void buildSyncLogChannel(
- TBuildSyncLogChannelReq req,
AsyncMethodCallback<TBuildSyncLogChannelRes> resultHandler)
+ public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq
req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -197,8 +180,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TBuildSyncLogChannelRes(status));
- return;
+ return new TBuildSyncLogChannelRes(status);
}
TSStatus responseStatus;
try {
@@ -208,12 +190,11 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
- resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
+ return new TBuildSyncLogChannelRes(responseStatus);
}
@Override
- public void removeSyncLogChannel(
- TRemoveSyncLogChannelReq req,
AsyncMethodCallback<TRemoveSyncLogChannelRes> resultHandler)
+ public TRemoveSyncLogChannelRes
removeSyncLogChannel(TRemoveSyncLogChannelReq req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -224,8 +205,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TRemoveSyncLogChannelRes(status));
- return;
+ return new TRemoveSyncLogChannelRes(status);
}
TSStatus responseStatus;
try {
@@ -235,12 +215,11 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
- resultHandler.onComplete(new TRemoveSyncLogChannelRes(responseStatus));
+ return new TRemoveSyncLogChannelRes(responseStatus);
}
@Override
- public void waitSyncLogComplete(
- TWaitSyncLogCompleteReq req,
AsyncMethodCallback<TWaitSyncLogCompleteRes> resultHandler)
+ public TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq
req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -251,18 +230,15 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0));
- return;
+ return new TWaitSyncLogCompleteRes(true, 0, 0);
}
long searchIndex = impl.getSearchIndex();
long safeIndex = impl.getMinSyncIndex();
- resultHandler.onComplete(
- new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex,
safeIndex));
+ return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex,
safeIndex);
}
@Override
- public void sendSnapshotFragment(
- TSendSnapshotFragmentReq req,
AsyncMethodCallback<TSendSnapshotFragmentRes> resultHandler)
+ public TSendSnapshotFragmentRes
sendSnapshotFragment(TSendSnapshotFragmentReq req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -273,8 +249,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TSendSnapshotFragmentRes(status));
- return;
+ return new TSendSnapshotFragmentRes(status);
}
TSStatus responseStatus;
try {
@@ -284,12 +259,11 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
- resultHandler.onComplete(new TSendSnapshotFragmentRes(responseStatus));
+ return new TSendSnapshotFragmentRes(responseStatus);
}
@Override
- public void triggerSnapshotLoad(
- TTriggerSnapshotLoadReq req,
AsyncMethodCallback<TTriggerSnapshotLoadRes> resultHandler)
+ public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq
req)
throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -300,20 +274,16 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TTriggerSnapshotLoadRes(status));
- return;
+ return new TTriggerSnapshotLoadRes(status);
}
impl.loadSnapshot(req.snapshotId);
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
- resultHandler.onComplete(
- new TTriggerSnapshotLoadRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
+ return new TTriggerSnapshotLoadRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
@Override
- public void cleanupTransferredSnapshot(
- TCleanupTransferredSnapshotReq req,
- AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler)
- throws TException {
+ public TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(
+ TCleanupTransferredSnapshotReq req) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
IoTConsensusServerImpl impl = consensus.getImpl(groupId);
@@ -323,8 +293,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status));
- return;
+ return new TCleanupTransferredSnapshotRes(status);
}
TSStatus responseStatus;
try {
@@ -335,7 +304,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
- resultHandler.onComplete(new
TCleanupTransferredSnapshotRes(responseStatus));
+ return new TCleanupTransferredSnapshotRes(responseStatus);
}
public void handleClientExit() {}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index a6d87669269..7cd0470f977 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -116,7 +116,7 @@ public class PipeConsensus implements IConsensus {
public synchronized void start() throws IOException {
initAndRecover();
- rpcService.initAsyncedServiceImpl(new
PipeConsensusRPCServiceProcessor(this, config.getPipe()));
+ rpcService.initSyncedServiceImpl(new
PipeConsensusRPCServiceProcessor(this, config.getPipe()));
try {
registerManager.register(rpcService);
} catch (StartupException e) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
index 83c80574e32..66bded8d13d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCService.java
@@ -29,8 +29,6 @@ import org.apache.iotdb.consensus.config.PipeConsensusConfig;
import org.apache.iotdb.consensus.pipe.thrift.PipeConsensusIService;
import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
-import org.apache.thrift.TBaseAsyncProcessor;
-
public class PipeConsensusRPCService extends ThriftService implements
PipeConsensusRPCServiceMBean {
private final TEndPoint thisNode;
@@ -48,15 +46,15 @@ public class PipeConsensusRPCService extends ThriftService
implements PipeConsen
}
@Override
- public void initAsyncedServiceImpl(Object pipeConsensusRPCServiceProcessor) {
+ public void initSyncedServiceImpl(Object pipeConsensusRPCServiceProcessor) {
this.pipeConsensusRPCServiceProcessor =
(PipeConsensusRPCServiceProcessor) pipeConsensusRPCServiceProcessor;
- super.initAsyncedServiceImpl(this.pipeConsensusRPCServiceProcessor);
+ super.initSyncedServiceImpl(this.pipeConsensusRPCServiceProcessor);
}
@Override
public void initTProcessor() {
- processor = new
PipeConsensusIService.AsyncProcessor<>(pipeConsensusRPCServiceProcessor);
+ processor = new
PipeConsensusIService.Processor<>(pipeConsensusRPCServiceProcessor);
}
@Override
@@ -64,20 +62,15 @@ public class PipeConsensusRPCService extends ThriftService
implements PipeConsen
try {
thriftServiceThread =
new ThriftServiceThread(
- (TBaseAsyncProcessor<?>) processor,
+ processor,
getID().getName(),
ThreadName.PIPE_CONSENSUS_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
- config.getRpc().getRpcSelectorThreadNum(),
- config.getRpc().getRpcMinConcurrentClientNum(),
config.getRpc().getRpcMaxConcurrentClientNum(),
config.getRpc().getThriftServerAwaitTimeForStopService(),
new
PipeConsensusRPCServiceHandler(pipeConsensusRPCServiceProcessor),
config.getRpc().isRpcThriftCompressionEnabled(),
- config.getRpc().getConnectionTimeoutInMs(),
- config.getRpc().getThriftMaxFrameSize(),
- ThriftServiceThread.ServerType.SELECTOR,
ZeroCopyRpcTransportFactory.INSTANCE);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
index e41fffb58e0..84a7cd8dbe3 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java
@@ -43,11 +43,10 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.AsyncIface {
+public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.Iface {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusRPCServiceProcessor.class);
private final PipeConsensus pipeConsensus;
@@ -61,28 +60,19 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
}
@Override
- public void pipeConsensusTransfer(
- TPipeConsensusTransferReq req,
- AsyncMethodCallback<TPipeConsensusTransferResp> resultHandler) {
- try {
- TPipeConsensusTransferResp resp =
config.getConsensusPipeReceiver().receive(req);
- // we need to call onComplete by hand
- resultHandler.onComplete(resp);
- } catch (Exception e) {
- resultHandler.onError(e);
- }
+ public TPipeConsensusTransferResp
pipeConsensusTransfer(TPipeConsensusTransferReq req) {
+ return config.getConsensusPipeReceiver().receive(req);
}
// TODO: consider batch transfer
@Override
- public void pipeConsensusBatchTransfer(
- TPipeConsensusBatchTransferReq req,
- AsyncMethodCallback<TPipeConsensusBatchTransferResp> resultHandler)
- throws TException {}
+ public TPipeConsensusBatchTransferResp pipeConsensusBatchTransfer(
+ TPipeConsensusBatchTransferReq req) throws TException {
+ return new TPipeConsensusBatchTransferResp();
+ }
@Override
- public void setActive(TSetActiveReq req, AsyncMethodCallback<TSetActiveResp>
resultHandler)
- throws TException {
+ public TSetActiveResp setActive(TSetActiveReq req) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -92,18 +82,15 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TSetActiveResp(status));
- return;
+ return new TSetActiveResp(status);
}
impl.setActive(req.isActive);
- resultHandler.onComplete(new TSetActiveResp(RpcUtils.SUCCESS_STATUS));
+ return new TSetActiveResp(RpcUtils.SUCCESS_STATUS);
}
@Override
- public void notifyPeerToCreateConsensusPipe(
- TNotifyPeerToCreateConsensusPipeReq req,
- AsyncMethodCallback<TNotifyPeerToCreateConsensusPipeResp> resultHandler)
- throws TException {
+ public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe(
+ TNotifyPeerToCreateConsensusPipeReq req) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -114,8 +101,7 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new
TNotifyPeerToCreateConsensusPipeResp(status));
- return;
+ return new TNotifyPeerToCreateConsensusPipeResp(status);
}
TSStatus responseStatus;
try {
@@ -130,14 +116,12 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
responseStatus.setMessage(e.getMessage());
LOGGER.warn("Failed to create consensus pipe to target peer with req
{}", req, e);
}
- resultHandler.onComplete(new
TNotifyPeerToCreateConsensusPipeResp(responseStatus));
+ return new TNotifyPeerToCreateConsensusPipeResp(responseStatus);
}
@Override
- public void notifyPeerToDropConsensusPipe(
- TNotifyPeerToDropConsensusPipeReq req,
- AsyncMethodCallback<TNotifyPeerToDropConsensusPipeResp> resultHandler)
- throws TException {
+ public TNotifyPeerToDropConsensusPipeResp notifyPeerToDropConsensusPipe(
+ TNotifyPeerToDropConsensusPipeReq req) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -148,8 +132,7 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TNotifyPeerToDropConsensusPipeResp(status));
- return;
+ return new TNotifyPeerToDropConsensusPipeResp(status);
}
TSStatus responseStatus;
try {
@@ -164,14 +147,12 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
responseStatus.setMessage(e.getMessage());
LOGGER.warn("Failed to drop consensus pipe to target peer with req {}",
req, e);
}
- resultHandler.onComplete(new
TNotifyPeerToDropConsensusPipeResp(responseStatus));
+ return new TNotifyPeerToDropConsensusPipeResp(responseStatus);
}
@Override
- public void checkConsensusPipeCompleted(
- TCheckConsensusPipeCompletedReq req,
- AsyncMethodCallback<TCheckConsensusPipeCompletedResp> resultHandler)
- throws TException {
+ public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted(
+ TCheckConsensusPipeCompletedReq req) throws TException {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
@@ -183,8 +164,7 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
LOGGER.error(message);
TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
status.setMessage(message);
- resultHandler.onComplete(new TCheckConsensusPipeCompletedResp(status,
true));
- return;
+ return new TCheckConsensusPipeCompletedResp(status, true);
}
TSStatus responseStatus;
boolean isCompleted;
@@ -203,7 +183,7 @@ public class PipeConsensusRPCServiceProcessor implements
PipeConsensusIService.A
true,
e);
}
- resultHandler.onComplete(new
TCheckConsensusPipeCompletedResp(responseStatus, isCompleted));
+ return new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted);
}
public void handleExit() {}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
index e160e2dd7de..35727458b55 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftService.java
@@ -76,16 +76,9 @@ public abstract class ThriftService implements IService {
JMXService.deregisterMBean(mbeanName);
}
- boolean setSyncedImpl = false;
- boolean setAsyncedImpl = false;
+ public void initSyncedServiceImpl(Object serviceImpl) {}
- public void initSyncedServiceImpl(Object serviceImpl) {
- setSyncedImpl = true;
- }
-
- public void initAsyncedServiceImpl(Object serviceImpl) {
- setAsyncedImpl = true;
- }
+ public void initAsyncServiceImpl(Object serviceImpl) {}
public abstract void initTProcessor()
throws ClassNotFoundException,
@@ -114,10 +107,6 @@ public abstract class ThriftService implements IService {
try {
reset();
initTProcessor();
- if (!setSyncedImpl && !setAsyncedImpl) {
- throw new StartupException(
- getID().getName(), "At least one service implementation should be
set.");
- }
initThriftServiceThread();
thriftServiceThread.setThreadStopLatch(stopLatch);
thriftServiceThread.start();