This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 88ba093562b PipeConsensus: keep consistent with IoTConsensus in WAL,
lastCache and other scenarios (#12822)
88ba093562b is described below
commit 88ba093562b92c3a6f98b47b9b1cb7e53e91c646
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Jun 28 01:52:27 2024 -0500
PipeConsensus: keep consistent with IoTConsensus in WAL, lastCache and
other scenarios (#12822)
---
.../org/apache/iotdb/db/conf/IoTDBStartCheck.java | 5 +++-
.../pipeconsensus/PipeConsensusReceiver.java | 2 ++
.../plan/planner/plan/node/PlanNode.java | 10 -------
.../plan/node/pipe/PipeEnrichedInsertNode.java | 5 ----
.../plan/planner/plan/node/write/InsertNode.java | 34 ++++++++++++++++------
.../db/storageengine/dataregion/DataRegion.java | 16 +++++-----
.../storageengine/dataregion/wal/WALManager.java | 27 ++++++++++++++---
.../dataregion/wal/recover/WALNodeRecoverTask.java | 31 +++++++++++++++-----
8 files changed, 85 insertions(+), 45 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index ca08c7bbb23..989a3328aa7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -216,7 +216,10 @@ public class IoTDBStartCheck {
properties = systemPropertiesHandler.read();
if (systemPropertiesHandler.isFirstStart()) {
- if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+ if
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+ || config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.IOT_CONSENSUS_V2))
&& config.getWalMode().equals(WALMode.DISABLE)) {
throw new ConfigurationException(
"Configuring the WALMode as disable is not supported under
IoTConsensus");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 9d45f96062c..87f3da19285 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -285,6 +285,7 @@ public class PipeConsensusReceiver {
Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
.orElseThrow(() -> new
ConsensusGroupNotExistException(consensusGroupId));
final InsertNode insertNode = req.getInsertNode();
+ insertNode.markAsGeneratedByRemoteConsensusLeader();
insertNode.setProgressIndex(
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
return new
TPipeConsensusTransferResp(impl.writeOnFollowerReplica(insertNode));
@@ -297,6 +298,7 @@ public class PipeConsensusReceiver {
Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
.orElseThrow(() -> new
ConsensusGroupNotExistException(consensusGroupId));
final InsertNode insertNode = req.convertToInsertNode();
+ insertNode.markAsGeneratedByRemoteConsensusLeader();
insertNode.setProgressIndex(
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
return new
TPipeConsensusTransferResp(impl.writeOnFollowerReplica(insertNode));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 29dd52df29a..2bbf723fd30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -49,7 +49,6 @@ public abstract class PlanNode implements IConsensusRequest {
protected PlanNodeId id;
protected boolean isGeneratedByPipe = false;
- protected boolean isGeneratedByRemoteConsensusLeader = false;
protected PlanNode(PlanNodeId id) {
requireNonNull(id, "id is null");
@@ -68,19 +67,10 @@ public abstract class PlanNode implements IConsensusRequest
{
return isGeneratedByPipe;
}
- public boolean isGeneratedByRemoteConsensusLeader() {
- return isGeneratedByRemoteConsensusLeader;
- }
-
public void markAsGeneratedByPipe() {
isGeneratedByPipe = true;
}
- @Override
- public void markAsGeneratedByRemoteConsensusLeader() {
- isGeneratedByRemoteConsensusLeader = true;
- }
-
public abstract List<PlanNode> getChildren();
public abstract void addChild(PlanNode child);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 855fe79de5d..6ea566053ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -250,11 +250,6 @@ public class PipeEnrichedInsertNode extends InsertNode {
return insertNode.getMinTime();
}
- @Override
- public boolean isSyncFromLeaderWhenUsingIoTConsensus() {
- return insertNode.isSyncFromLeaderWhenUsingIoTConsensus();
- }
-
@Override
public void markFailedMeasurement(int index) {
insertNode.markFailedMeasurement(index);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 054770c30ed..a572f689e8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -24,7 +24,10 @@ import
org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
@@ -48,6 +51,8 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
/** this insert node doesn't need to participate in iot consensus */
public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+
/**
* if use id table, this filed is id form of device path <br>
* if not, this filed is device path<br>
@@ -73,6 +78,8 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
+ protected boolean isGeneratedByRemoteConsensusLeader = false;
+
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
@@ -169,6 +176,24 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
this.searchIndex = searchIndex;
}
+ public boolean isGeneratedByRemoteConsensusLeader() {
+ switch (config.getDataRegionConsensusProtocolClass()) {
+ case ConsensusFactory.IOT_CONSENSUS:
+ case ConsensusFactory.IOT_CONSENSUS_V2:
+ case ConsensusFactory.FAST_IOT_CONSENSUS:
+ case ConsensusFactory.RATIS_CONSENSUS:
+ return isGeneratedByRemoteConsensusLeader;
+ case ConsensusFactory.SIMPLE_CONSENSUS:
+ return false;
+ }
+ return false;
+ }
+
+ @Override
+ public void markAsGeneratedByRemoteConsensusLeader() {
+ isGeneratedByRemoteConsensusLeader = true;
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
throw new NotImplementedException("serializeAttributes of InsertNode is
not implemented");
@@ -231,15 +256,6 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
public abstract long getMinTime();
- /**
- * Notice: Call this method ONLY when using IOT_CONSENSUS, other consensus
protocol cannot
- * distinguish whether the insertNode sync from leader by this method.
- * isSyncFromLeaderWhenUsingIoTConsensus == true means this node is a
follower
- */
- public boolean isSyncFromLeaderWhenUsingIoTConsensus() {
- return searchIndex == ConsensusReqReader.DEFAULT_SEARCH_INDEX;
- }
-
// region partial insert
@TestOnly
public void markFailedMeasurement(int index) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 9f7252bd98c..733eca443d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1097,8 +1097,7 @@ public class DataRegion implements IDataRegionForQuery {
private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
- ||
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
- && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
+ || node.isGeneratedByRemoteConsensusLeader()) {
// disable updating last cache on follower
return;
}
@@ -1142,8 +1141,7 @@ public class DataRegion implements IDataRegionForQuery {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- if
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
- && insertRowNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+ if (insertRowNode.isGeneratedByRemoteConsensusLeader()) {
return tsFileProcessor;
}
// disable updating last cache on follower
@@ -1242,8 +1240,7 @@ public class DataRegion implements IDataRegionForQuery {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- if
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
- && insertRowsNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+ if (insertRowsNode.isGeneratedByRemoteConsensusLeader()) {
return;
}
// disable updating last cache on follower
@@ -3343,8 +3340,7 @@ public class DataRegion implements IDataRegionForQuery {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]);
if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
- if
((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
- &&
insertRowsOfOneDeviceNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+ if (insertRowsOfOneDeviceNode.isGeneratedByRemoteConsensusLeader()) {
return;
}
// disable updating last cache on follower
@@ -3623,7 +3619,9 @@ public class DataRegion implements IDataRegionForQuery {
private void acquireDirectBufferMemory() throws DataRegionException {
long acquireDirectBufferMemCost = 0;
- if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
+ if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+ ||
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS)
+ ||
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2))
{
acquireDirectBufferMemCost = config.getWalBufferSize();
} else if (config
.getDataRegionConsensusProtocolClass()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
index f3911c9b196..37f6330f2b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
@@ -69,7 +69,11 @@ public class WALManager implements IService {
private final AtomicLong totalFileNum = new AtomicLong();
private WALManager() {
- if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
+ if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+ ||
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)
+ || config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.FAST_IOT_CONSENSUS)) {
walNodesManager = new FirstCreateStrategy();
} else if (config.getMaxWalNodesNum() == 0) {
walNodesManager = new ElasticStrategy();
@@ -80,6 +84,12 @@ public class WALManager implements IService {
public static String getApplicantUniqueId(String storageGroupName, boolean
sequence) {
return
config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+ || config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.IOT_CONSENSUS_V2)
+ || config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.FAST_IOT_CONSENSUS)
? storageGroupName
: storageGroupName
+ IoTDBConstant.FILE_NAME_SEPARATOR
@@ -95,11 +105,17 @@ public class WALManager implements IService {
return walNodesManager.applyForWALNode(applicantUniqueId);
}
- /** WAL node will be registered only when using iot consensus protocol. */
+ /** WAL node will be registered only when using iot series consensus
protocol. */
public void registerWALNode(
String applicantUniqueId, String logDirectory, long startFileVersion,
long startSearchIndex) {
if (config.getWalMode() == WALMode.DISABLE
- ||
!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
+ ||
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+ && !config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.IOT_CONSENSUS_V2)
+ && !config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) {
return;
}
@@ -114,7 +130,10 @@ public class WALManager implements IService {
||
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
&& !config
.getDataRegionConsensusProtocolClass()
- .equals(ConsensusFactory.IOT_CONSENSUS_V2))) {
+ .equals(ConsensusFactory.IOT_CONSENSUS_V2)
+ && !config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 07a06448359..1419f9c6136 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -107,13 +107,7 @@ public class WALNodeRecoverTask implements Runnable {
}
try {
- if
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
- // delete this wal node folder
- FileUtils.deleteFileOrDirectory(logDirectory);
- logger.info(
- "Successfully recover WAL node in the directory {}, so delete
these wal files.",
- logDirectory);
- } else {
+ if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
// delete checkpoint info to avoid repeated recover
File[] checkpointFiles =
CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
for (File checkpointFile : checkpointFiles) {
@@ -133,6 +127,29 @@ public class WALNodeRecoverTask implements Runnable {
logger.info(
"Successfully recover WAL node in the directory {}, add this node
to WALManger.",
logDirectory);
+ } else {
+ // delete this wal node folder
+ FileUtils.deleteFileOrDirectory(logDirectory);
+ logger.info(
+ "Successfully recover WAL node in the directory {}, so delete
these wal files.",
+ logDirectory);
+ }
+
+ // PipeConsensus will not only delete WAL node folder, but also register
WAL node.
+ if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS)
+ || config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
+ // register wal node
+ WALManager.getInstance()
+ .registerWALNode(
+ logDirectory.getName(),
+ logDirectory.getAbsolutePath(),
+ lastVersionId + 1,
+ lastSearchIndex);
+ logger.info(
+ "Successfully recover WAL node in the directory {}, add this node
to WALManger.",
+ logDirectory);
}
} finally {
allNodesRecoveredLatch.countDown();