This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch multileader_restart_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multileader_restart_test by
this push:
new 6cbb66f9e5 disable write for sync log
6cbb66f9e5 is described below
commit 6cbb66f9e53a2033a0d915ba70397330b99cb772
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri Jul 8 14:48:02 2022 +0800
disable write for sync log
---
.../service/MultiLeaderRPCServiceProcessor.java | 65 ++++++++++------------
1 file changed, 28 insertions(+), 37 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 8c54743097..e9ef73d72b 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -19,27 +19,17 @@
package org.apache.iotdb.consensus.multileader.service;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.StepTracker;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
-import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIService.AsyncIface {
private final Logger logger =
LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
@@ -55,33 +45,34 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
throws TException {
long startTime = System.nanoTime();
try {
- ConsensusGroupId groupId =
-
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
- if (impl == null) {
- String message =
- String.format(
- "Unexpected consensusGroupId %s for TSyncLogReq which size is
%s",
- groupId, req.getBatches().size());
- logger.error(message);
- TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- status.setMessage(message);
- resultHandler.onComplete(new
TSyncLogRes(Collections.singletonList(status)));
- return;
- }
- List<TSStatus> statuses = new ArrayList<>();
- // We use synchronized to ensure atomicity of executing multiple logs
- synchronized (impl.getStateMachine()) {
- for (TLogBatch batch : req.getBatches()) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- new ByteBufferConsensusRequest(batch.data))));
- }
- }
- logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId, statuses);
- resultHandler.onComplete(new TSyncLogRes(statuses));
+ // ConsensusGroupId groupId =
+ //
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ // MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ // if (impl == null) {
+ // String message =
+ // String.format(
+ // "Unexpected consensusGroupId %s for TSyncLogReq which
size is %s",
+ // groupId, req.getBatches().size());
+ // logger.error(message);
+ // TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ // status.setMessage(message);
+ // resultHandler.onComplete(new
TSyncLogRes(Collections.singletonList(status)));
+ // return;
+ // }
+ // List<TSStatus> statuses = new ArrayList<>();
+ // // We use synchronized to ensure atomicity of executing multiple
logs
+ // synchronized (impl.getStateMachine()) {
+ // for (TLogBatch batch : req.getBatches()) {
+ // statuses.add(
+ // impl.getStateMachine()
+ // .write(
+ //
impl.buildIndexedConsensusRequestForRemoteRequest(
+ // new
ByteBufferConsensusRequest(batch.data))));
+ // }
+ // }
+ // logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId,
+ // statuses);
+ resultHandler.onComplete(new TSyncLogRes());
} catch (Exception e) {
resultHandler.onError(e);
} finally {