This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/migration_add_wait_sync in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c5098b4a70465de8fa1f12f3b533c0c8601eaa4d Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Oct 26 16:26:06 2022 +0800 tmp save --- .../service/MultiLeaderRPCServiceProcessor.java | 9 +++++++++ .../org/apache/iotdb/db/mpp/plan/TestRPCClient.java | 21 ++++++++++++++++++--- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java index 20b6aad421..4592756100 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java @@ -238,6 +238,15 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); MultiLeaderServerImpl impl = consensus.getImpl(groupId); + if (impl == null) { + String message = + String.format("unexpected consensusGroupId %s for waitSyncLogComplete request", groupId); + logger.error(message); + TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage(message); + resultHandler.onComplete(new TWaitSyncLogCompleteRes(true, 0, 0)); + return; + } long searchIndex = impl.getIndex(); long safeIndex = impl.getCurrentSafelyDeletedSearchIndex(); resultHandler.onComplete( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java index 4a5b4169fe..cda96d3379 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java @@ -33,6 +33,8 @@ import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq; import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes; import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadReq; import org.apache.iotdb.consensus.multileader.thrift.TTriggerSnapshotLoadRes; +import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteReq; +import org.apache.iotdb.consensus.multileader.thrift.TWaitSyncLogCompleteRes; import org.apache.iotdb.db.client.DataNodeClientPoolFactory; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; @@ -62,11 +64,24 @@ public class TestRPCClient { public static void main(String args[]) { TestRPCClient client = new TestRPCClient(); - // client.removeRegionPeer(); - client.addPeer(); + client.removeRegionPeer(); +// client.testWaitSyncLog(); // client.loadSnapshot(); } + private void testWaitSyncLog() { + try (SyncMultiLeaderServiceClient client = + syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40012))) { + TWaitSyncLogCompleteRes res = + client.waitSyncLogComplete( + new TWaitSyncLogCompleteReq(new DataRegionId(1).convertToTConsensusGroupId())); + System.out.printf("%s, %d, %d",res.complete, res.searchIndex, res.safeIndex); + } catch (IOException | TException e) { + System.out.println("Error: " + e.getMessage()); + throw new RuntimeException(e); + } + } + private void loadSnapshot() { try (SyncMultiLeaderServiceClient client = syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) { @@ -96,7 +111,7 @@ public class TestRPCClient { try (SyncDataNodeInternalServiceClient client = INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) { client.removeRegionPeer( - new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3))); + new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation3(3))); } catch (IOException | TException e) { throw new RuntimeException(e); }
