This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_0729_test_exp1_no_write in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f52e099f36f39d26eb75cf954959d43b3d7eed61 Author: Jinrui.Zhang <[email protected]> AuthorDate: Fri Aug 5 15:07:20 2022 +0800 disable write in DataRegion and add wait when offer to queue --- .../multileader/MultiLeaderServerImpl.java | 3 ++- .../multileader/logdispatcher/LogDispatcher.java | 29 +++++++++++++++------- .../statemachine/DataRegionStateMachine.java | 6 ++--- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index e610ff78f1..973a31272d 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -125,7 +125,8 @@ public class MultiLeaderServerImpl { indexedConsensusRequest.getSearchIndex()); } // TODO wal and memtable - TSStatus result = stateMachine.write(indexedConsensusRequest); + // TSStatus result = stateMachine.write(indexedConsensusRequest); + TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime()); long offerStartTime = System.nanoTime(); if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index c951ec83c3..69bc57b294 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -115,16 +115,27 @@ public class LogDispatcher { "{}: Push a log to the queue, where the queue length is {}", impl.getThisNode().getGroupId(), thread.getPendingRequest().size()); - if (thread - .getPendingRequest() - .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) { - thread.countQueue(request.getSearchIndex()); - } else { - logger.debug( - "{}: Log queue of {} is full, ignore the log to this node", - impl.getThisNode().getGroupId(), - thread.getPeer()); + long putToQueueStartTime = System.nanoTime(); + try { + thread + .getPendingRequest() + .put(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex())); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime, System.nanoTime()); } + // if (thread + // .getPendingRequest() + // .offer(new IndexedConsensusRequest(serializedRequests, + // request.getSearchIndex()))) { + // thread.countQueue(request.getSearchIndex()); + // } else { + // logger.debug( + // "{}: Log queue of {} is full, ignore the log to this node", + // impl.getThisNode().getGroupId(), + // thread.getPeer()); + // } }); } diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index 8c63bcf173..7342976f4a 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -185,9 +185,9 @@ public class DataRegionStateMachine extends BaseStateMachine { StepTracker.trace("followerWritePrepare", 25, prepareStartTime, System.nanoTime()); long writeStartTime = System.nanoTime(); if (insertNodeWrapper != null) { - for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) { - statuses.add(write(insertNode)); - } + // for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) { + // statuses.add(write(insertNode)); + // } insertNodeWrapper.resultHandler.onComplete(new TSyncLogRes(statuses)); } StepTracker.trace("followerWriteInsert", 25, writeStartTime, System.nanoTime());
