This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by
this push:
new dc06fca1b9 add another constraint to keep the write order as far as
possible
dc06fca1b9 is described below
commit dc06fca1b9f270b304c8145182b13d2048b98e84
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Aug 9 15:01:59 2022 +0800
add another constraint to keep the write order as far as possible
---
.../statemachine/DataRegionStateMachine.java | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index b182a811cf..f6f1628d2d 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -155,12 +155,21 @@ public class DataRegionStateMachine extends
BaseStateMachine {
boolean timeout =
!queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS,
TimeUnit.MILLISECONDS);
if (timeout) {
- logger.info(
- "waiting target request timeout. current index: {}, target
index: {}",
- insertNodeWrapper.getStartSyncIndex(),
- nextSyncIndex);
- requestCache.remove(insertNodeWrapper);
- break;
+ if (requestCache.peek().getStartSyncIndex() ==
insertNodeWrapper.getStartSyncIndex()) {
+ // current thread hold the peek request thus it can write the
peek immediately.
+ logger.info(
+ "waiting target request timeout. current index: {}, target
index: {}",
+ insertNodeWrapper.getStartSyncIndex(),
+ nextSyncIndex);
+ requestCache.remove(insertNodeWrapper);
+ break;
+ } else {
+ // although the timeout is triggered, current thread cannot
write its request
+ // because there should be some other thread who hold the peek
request.
+ // And current thread should signal all the other threads to let
the thread
+ // who holds the peed request to execute write operation.
+ queueSortCondition.signalAll();
+ }
}
} catch (InterruptedException e) {
logger.warn(