This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2f7069baf2f Fix sync stuck problem of IoTConsensus and WAL (#12955)
2f7069baf2f is described below
commit 2f7069baf2ff5f58e89063480af57e7f072b43c2
Author: Li Yu Heng <[email protected]>
AuthorDate: Wed Jul 17 21:38:35 2024 +0800
Fix sync stuck problem of IoTConsensus and WAL (#12955)
* now i'm master of WALNode
* improve
* tan review
* rename
---
.../consensus/iot/log/ConsensusReqReader.java | 2 +-
.../dataregion/DataExecutionVisitor.java | 6 +++
.../dataregion/DataRegionStateMachine.java | 5 ++-
.../plan/planner/plan/node/PlanNodeType.java | 2 +
.../ContinuousSameSearchIndexSeparatorNode.java | 43 ++++++++++++++++++++++
.../db/storageengine/dataregion/DataRegion.java | 14 +++++++
.../dataregion/wal/buffer/WALEntry.java | 3 ++
.../dataregion/wal/buffer/WALEntryType.java | 1 +
.../dataregion/wal/buffer/WALInfoEntry.java | 1 +
.../dataregion/wal/node/IWALNode.java | 4 ++
.../dataregion/wal/node/WALFakeNode.java | 7 ++++
.../storageengine/dataregion/wal/node/WALNode.java | 15 ++++++--
12 files changed, 98 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
index d7e9f246318..6959b56b674 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/log/ConsensusReqReader.java
@@ -46,7 +46,7 @@ public interface ConsensusReqReader {
/** This iterator provides blocking and non-blocking interfaces to read
consensus request. */
interface ReqIterator {
- // Like {@link Iterator#hasNext()}
+ /** Like {@link Iterator#hasNext()} */
boolean hasNext();
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 14b2229ed78..efd9859588c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -60,6 +60,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
+ dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (OutOfTTLException e) {
LOGGER.warn("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
@@ -77,6 +78,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion
dataRegion) {
try {
dataRegion.insertTablet(node);
+ dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (OutOfTTLException e) {
LOGGER.warn("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
@@ -113,6 +115,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
+ dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node,
e.getMessage());
@@ -146,6 +149,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node,
DataRegion dataRegion) {
try {
dataRegion.insertTablets(node);
+ dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (BatchProcessException e) {
LOGGER.warn("Batch failure in executing a InsertMultiTabletsNode.");
@@ -177,6 +181,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
try {
dataRegion.insert(node);
+ dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (WriteProcessRejectException e) {
LOGGER.warn("Reject in executing plan node: {}, caused by {}", node,
e.getMessage());
@@ -235,6 +240,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
path, node.getDeleteStartTime(), node.getDeleteEndTime(),
node.getSearchIndex());
}
}
+ dataRegion.insertSeparatorToWAL();
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(node);
return StatusUtils.OK;
} catch (IOException | IllegalPathException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index f60ae0bf9a4..2cc420e1f86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache;
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
@@ -148,9 +149,11 @@ public class DataRegionStateMachine extends
BaseStateMachine {
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
PlanNode planNode = getPlanNode(req);
+ if (planNode instanceof SearchNode) {
+ ((SearchNode)
planNode).setSearchIndex(indexedRequest.getSearchIndex());
+ }
if (planNode instanceof InsertNode) {
InsertNode innerNode = (InsertNode) planNode;
- innerNode.setSearchIndex(indexedRequest.getSearchIndex());
insertNodes.add(innerNode);
} else if (indexedRequest.getRequests().size() == 1) {
// If the planNode is not InsertNode, it is expected that the
IndexedConsensusRequest only
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index c1432bf1451..061f22f171d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -222,6 +222,8 @@ public enum PlanNodeType {
TIMESERIES_REGION_SCAN((short) 94),
REGION_MERGE((short) 95),
DEVICE_SCHEMA_FETCH_SCAN((short) 96),
+
+ CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR((short) 97),
;
public static final int BYTES = Short.BYTES;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java
new file mode 100644
index 00000000000..78bf312bf9e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ContinuousSameSearchIndexSeparatorNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+
+/**
+ * For IoTConsensus sync. See <a
href="https://github.com/apache/iotdb/pull/12955">github pull
+ * request</a> for details.
+ */
+public class ContinuousSameSearchIndexSeparatorNode implements WALEntryValue {
+
+ @Override
+ public void serializeToWAL(IWALByteBufferView buffer) {
+
buffer.putShort(PlanNodeType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR.getNodeType());
+ // search index is always -1
+ buffer.putLong(-1);
+ }
+
+ @Override
+ public int serializedSize() {
+ return Short.BYTES + Long.BYTES;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index de15317527d..1396ab501f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -53,6 +53,7 @@ import
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -2163,6 +2164,19 @@ public class DataRegion implements IDataRegionForQuery {
return walFlushListeners;
}
+ /**
+ * For IoTConsensus sync. See <a
href="https://github.com/apache/iotdb/pull/12955">github pull
+ * request</a> for details.
+ */
+ public void insertSeparatorToWAL() {
+ getWALNode()
+ .ifPresent(
+ walNode ->
+ walNode.log(
+ TsFileProcessor.MEMTABLE_NOT_EXIST,
+ new ContinuousSameSearchIndexSeparatorNode()));
+ }
+
private boolean canSkipDelete(
TsFileResource tsFileResource,
Set<PartialPath> devicePaths,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
index dff07dae0ef..f9ea8fdb333 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.wal.buffer;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -71,6 +72,8 @@ public abstract class WALEntry implements SerializedSize {
this.type = WALEntryType.DELETE_DATA_NODE;
} else if (value instanceof Checkpoint) {
this.type = WALEntryType.MEMORY_TABLE_CHECKPOINT;
+ } else if (value instanceof ContinuousSameSearchIndexSeparatorNode) {
+ this.type = WALEntryType.CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE;
} else {
throw new RuntimeException("Unknown WALEntry type");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
index 5a2785798f0..914a2e90663 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
@@ -40,6 +40,7 @@ public enum WALEntryType {
MEMORY_TABLE_CHECKPOINT((byte) 7),
/** {@link
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode} */
INSERT_ROWS_NODE((byte) 8),
+ CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE((byte) 9),
// endregion
// region signal entry type
// signal wal buffer has been closed
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
index 966361857f5..8b327b4880b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALInfoEntry.java
@@ -76,6 +76,7 @@ public class WALInfoEntry extends WALEntry {
case INSERT_ROWS_NODE:
case DELETE_DATA_NODE:
case MEMORY_TABLE_SNAPSHOT:
+ case CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE:
value.serializeToWAL(buffer);
break;
case MEMORY_TABLE_CHECKPOINT:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
index 7e10c3a70c6..2baf45f3ae5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/IWALNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.node;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -44,6 +45,9 @@ public interface IWALNode extends FlushListener,
AutoCloseable, ConsensusReqRead
/** Log DeleteDataNode. */
WALFlushListener log(long memTableId, DeleteDataNode deleteDataNode);
+ /** Log BatchDoneNode */
+ WALFlushListener log(long memTableId, ContinuousSameSearchIndexSeparatorNode
separatorNode);
+
/** Callback when memTable created. */
void onMemTableCreated(IMemTable memTable, String targetTsFile);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
index 2902a6dcae9..38b69f1162b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALFakeNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.node;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -67,6 +68,12 @@ public class WALFakeNode implements IWALNode {
return getResult();
}
+ @Override
+ public WALFlushListener log(
+ long memTableId, ContinuousSameSearchIndexSeparatorNode separatorNode) {
+ return getResult();
+ }
+
private WALFlushListener getResult() {
switch (status) {
case SUCCESS:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 79d5d26673b..755de60016b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -167,6 +168,13 @@ public class WALNode implements IWALNode {
return log(walEntry);
}
+ @Override
+ public WALFlushListener log(
+ long memTableId, ContinuousSameSearchIndexSeparatorNode separatorNode) {
+ WALEntry walEntry = new WALInfoEntry(memTableId, separatorNode);
+ return log(walEntry);
+ }
+
private WALFlushListener log(WALEntry walEntry) {
buffer.write(walEntry);
@@ -709,7 +717,8 @@ public class WALNode implements IWALNode {
buffer.clear();
if (currentIndex == targetIndex) {
tmpNodes.add(new IoTConsensusRequest(buffer));
- } else { // different search index, all slices found
+ } else {
+ // different search index, all slices found
if (!tmpNodes.isEmpty()) {
insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
tmpNodes = new ArrayList<>();
@@ -720,8 +729,8 @@ public class WALNode implements IWALNode {
targetIndex = currentIndex;
}
}
- } else if (!tmpNodes
- .isEmpty()) { // next entry doesn't need to be searched, all
slices found
+ } else if (!tmpNodes.isEmpty()) {
+ // next entry doesn't need to be searched, all slices found
insertNodes.add(new IndexedConsensusRequest(targetIndex,
tmpNodes));
targetIndex++;
tmpNodes = new ArrayList<>();