This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch multi_leader_duplicate_entry_debug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 79ea36bdbf39f58f491c9b4ebc73928b320f5d5f Author: OneSizeFitQuorum <[email protected]> AuthorDate: Tue Nov 15 15:32:28 2022 +0800 add logs Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../multileader/logdispatcher/LogDispatcher.java | 27 ++++++++++++++++++++-- .../service/MultiLeaderRPCServiceProcessor.java | 8 +++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index f0b4791ea8..5aea9d6c7a 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -396,6 +396,16 @@ public class LogDispatcher { return batches; } + if (!batches.isEmpty() + && batches.getBatches().get(batches.getBatches().size() - 1).getSearchIndex() + == prev.getSearchIndex()) { + logger.error( + "{} : encounter duplicated request {} for pendingBatch {} from wal and queue", + impl.getThisNode().getGroupId(), + prev, + batches); + } + constructBatchIndexedFromConsensusRequest(prev, batches); iterator.remove(); releaseReservedMemory(prev); @@ -421,6 +431,14 @@ public class LogDispatcher { return batches; } } + if (batches.getBatches().get(batches.getBatches().size() - 1).getSearchIndex() + == current.getSearchIndex()) { + logger.error( + "{} : encounter duplicated request {} for pendingBatch {} from wal and queue", + impl.getThisNode().getGroupId(), + current, + batches); + } constructBatchIndexedFromConsensusRequest(current, batches); prev = current; // We might not be able to remove all the elements in the bufferedRequest in the @@ -478,18 +496,23 @@ public class LogDispatcher { IndexedConsensusRequest data = walEntryIterator.next(); if (data.getSearchIndex() < targetIndex) { // if the index of request is smaller than currentIndex, then continue - logger.warn( + logger.error( "search for one Entry which index is {}, but find a smaller one, index : {}", targetIndex, data.getSearchIndex()); continue; } else if (data.getSearchIndex() > targetIndex) { - logger.warn( + logger.error( "search for one Entry which index is {}, but find a larger one, index : {}", targetIndex, data.getSearchIndex()); if (data.getSearchIndex() >= maxIndex) { // if the index of request is larger than maxIndex, then finish + logger.error( + "break constructBatchFromWAL({}, {}) when encounter request {}", + currentIndex, + maxIndex, + data); break; } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java index 67bceadd12..1f9f80584c 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java @@ -102,6 +102,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ BatchIndexedConsensusRequest requestsInThisBatch = new BatchIndexedConsensusRequest(req.peerId); // We use synchronized to ensure atomicity of executing multiple logs + long searchIndex = -1; for (TLogBatch batch : req.getBatches()) { requestsInThisBatch.add( impl.buildIndexedConsensusRequestForRemoteRequest( @@ -112,6 +113,13 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ ? MultiLeaderConsensusRequest::new : ByteBufferConsensusRequest::new) .collect(Collectors.toList()))); + if (searchIndex != -1 && searchIndex == batch.searchIndex) { + logger.error( + "execute TSyncLogReq for {} with duplicated searchIndex {}", + req.consensusGroupId, + req); + } + searchIndex = batch.getSearchIndex(); } TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch); logger.debug(
