This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch test_wal_sync in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1d5b65520762fa5787146705884515dd562f435a Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Jun 27 18:07:05 2022 +0800 change wal sync to iterator --- .../multileader/logdispatcher/LogDispatcher.java | 18 +++++++++++++++++- .../java/org/apache/iotdb/db/wal/node/WALNode.java | 2 +- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 1d651a0ba5..9d592d004e 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -137,6 +137,9 @@ public class LogDispatcher { (ConsensusReqReader) impl.getStateMachine().read(new GetConsensusReqReaderPlan()); private volatile boolean stopped = false; + private ConsensusReqReader.ReqIterator walEntryiterator; + private long iteratorIndex = 1; + public LogDispatcherThread(Peer peer, MultiLeaderConfig config) { this.peer = peer; this.config = config; @@ -146,6 +149,7 @@ public class LogDispatcher { new IndexController( impl.getStorageDir(), Utils.fromTEndPointToString(peer.getEndpoint()), false); this.syncStatus = new SyncStatus(controller, config); + this.walEntryiterator = reader.getReqIterator(iteratorIndex); } public IndexController getController() { @@ -283,10 +287,22 @@ public class LogDispatcher { private long constructBatchFromWAL( long currentIndex, long maxIndex, List<TLogBatch> logBatches) { + if (iteratorIndex != currentIndex) { + walEntryiterator.skipTo(currentIndex); + iteratorIndex = currentIndex; + } + while (currentIndex < maxIndex && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) { + try { + walEntryiterator.waitForNextReady(); + } catch (InterruptedException e) { + e.printStackTrace(); + } // TODO iterator - IConsensusRequest data = reader.getReq(currentIndex++); + IConsensusRequest data = walEntryiterator.next(); + iteratorIndex++; + currentIndex++; if (data != null) { logBatches.add(new TLogBatch(data.serializeToByteBuffer())); } diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java index 5d85114446..8f207cee32 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java @@ -237,7 +237,7 @@ public class WALNode implements IWALNode { } } - logger.debug( + logger.info( "Start deleting outdated wal files for wal node-{}, the first valid version id is {}, and the safely deleted search index is {}.", identifier, firstValidVersionId,
