This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d18f6b91c3d Fix IoTConsensus sync stuck problem (#12856)
d18f6b91c3d is described below
commit d18f6b91c3d564e63c06b6ffd0d80379b36804d8
Author: Li Yu Heng <[email protected]>
AuthorDate: Wed Jul 10 01:06:21 2024 +0800
Fix IoTConsensus sync stuck problem (#12856)
---
.../dataregion/DataRegionStateMachine.java | 2 +-
.../planner/plan/node/write/DeleteDataNode.java | 18 +------
.../plan/planner/plan/node/write/InsertNode.java | 22 +-------
.../plan/planner/plan/node/write/SearchNode.java | 51 ++++++++++++++++++
.../db/storageengine/dataregion/DataRegion.java | 60 ++++++++++++----------
.../dataregion/memtable/TsFileProcessor.java | 2 +
.../storageengine/dataregion/wal/node/WALNode.java | 16 ++++++
.../dataregion/wal/recover/WALNodeRecoverTask.java | 45 +++++++---------
...{WALRecoverWriter.java => WALRepairWriter.java} | 8 +--
.../compaction/AbstractCompactionTest.java | 3 ++
...eCrossSpaceCompactionWithFastPerformerTest.java | 5 ++
...verWriterTest.java => WALRepairWriterTest.java} | 27 ++++------
12 files changed, 147 insertions(+), 112 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 0f949fd4904..f60ae0bf9a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -289,7 +289,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
@Override
public DataSet read(IConsensusRequest request) {
if (request instanceof GetConsensusReqReaderPlan) {
- return region.getWALNode();
+ return
region.getWALNode().orElseThrow(UnsupportedOperationException::new);
} else {
FragmentInstance fragmentInstance;
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 93eb2a873ab..54f20728de2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -53,9 +53,8 @@ import java.util.Objects;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode.NO_CONSENSUS_INDEX;
-public class DeleteDataNode extends WritePlanNode implements WALEntryValue {
+public class DeleteDataNode extends SearchNode implements WALEntryValue {
/** byte: type, integer: pathList.size(), long: deleteStartTime,
deleteEndTime, searchIndex */
private static final int FIXED_SERIALIZED_SIZE = Short.BYTES + Integer.BYTES
+ Long.BYTES * 3;
@@ -63,12 +62,6 @@ public class DeleteDataNode extends WritePlanNode implements
WALEntryValue {
private final long deleteStartTime;
private final long deleteEndTime;
- /**
- * this index is used by wal search, its order should be protected by the
upper layer, and the
- * value should start from 1
- */
- protected long searchIndex = NO_CONSENSUS_INDEX;
-
private TRegionReplicaSet regionReplicaSet;
public DeleteDataNode(
@@ -104,15 +97,6 @@ public class DeleteDataNode extends WritePlanNode
implements WALEntryValue {
return deleteEndTime;
}
- public long getSearchIndex() {
- return searchIndex;
- }
-
- /** Search index should start from 1 */
- public void setSearchIndex(long searchIndex) {
- this.searchIndex = searchIndex;
- }
-
@Override
public List<PlanNode> getChildren() {
return new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index a572f689e8c..161b6ac9d2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -25,11 +25,9 @@ import
org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
-import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -46,10 +44,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-public abstract class InsertNode extends WritePlanNode implements
ComparableConsensusRequest {
-
- /** this insert node doesn't need to participate in iot consensus */
- public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+public abstract class InsertNode extends SearchNode implements
ComparableConsensusRequest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -72,12 +67,6 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
*/
protected IDeviceID deviceID;
- /**
- * this index is used by wal search, its order should be protected by the
upper layer, and the
- * value should start from 1
- */
- protected long searchIndex = NO_CONSENSUS_INDEX;
-
protected boolean isGeneratedByRemoteConsensusLeader = false;
/** Physical address of data region after splitting */
@@ -167,15 +156,6 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
this.deviceID = deviceID;
}
- public long getSearchIndex() {
- return searchIndex;
- }
-
- /** Search index should start from 1 */
- public void setSearchIndex(long searchIndex) {
- this.searchIndex = searchIndex;
- }
-
public boolean isGeneratedByRemoteConsensusLeader() {
switch (config.getDataRegionConsensusProtocolClass()) {
case ConsensusFactory.IOT_CONSENSUS:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
new file mode 100644
index 00000000000..c5172534298
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+
+import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode.NO_CONSENSUS_INDEX;
+
+public abstract class SearchNode extends WritePlanNode {
+
+ /** this insert node doesn't need to participate in iot consensus */
+ public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
+
+ /**
+ * this index is used by wal search, its order should be protected by the
upper layer, and the
+ * value should start from 1
+ */
+ protected long searchIndex = NO_CONSENSUS_INDEX;
+
+ protected SearchNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public long getSearchIndex() {
+ return searchIndex;
+ }
+
+ /** Search index should start from 1 */
+ public void setSearchIndex(long searchIndex) {
+ this.searchIndex = searchIndex;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7f49bf24818..ed008bb47d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -944,6 +944,8 @@ public class DataRegion implements IDataRegionForQuery {
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() -
startTime);
try {
if (deleted) {
+ logger.info(
+ "Won't insert tablet {}, because region is deleted",
insertTabletNode.getSearchIndex());
return;
}
TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
@@ -1079,6 +1081,12 @@ public class DataRegion implements IDataRegionForQuery {
long timePartitionId) {
// return when start >= end or all measurement failed
if (start >= end || insertTabletNode.allMeasurementFailed()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Won't insert tablet {}, because {}",
+ insertTabletNode.getSearchIndex(),
+ start >= end ? "start >= end" : "insertTabletNode
allMeasurementFailed");
+ }
return true;
}
@@ -2018,31 +2026,17 @@ public class DataRegion implements IDataRegionForQuery {
}
/** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
- private void separateTsFile(
+ private void getTwoKindsOfTsFiles(
List<TsFileResource> sealedResource,
List<TsFileResource> unsealedResource,
long startTime,
long endTime) {
- tsFileManager
- .getTsFileList(true, startTime, endTime)
- .forEach(
- tsFileResource -> {
- if (tsFileResource.isClosed()) {
- sealedResource.add(tsFileResource);
- } else {
- unsealedResource.add(tsFileResource);
- }
- });
- tsFileManager
- .getTsFileList(false, startTime, endTime)
- .forEach(
- tsFileResource -> {
- if (tsFileResource.isClosed()) {
- sealedResource.add(tsFileResource);
- } else {
- unsealedResource.add(tsFileResource);
- }
- });
+ List<TsFileResource> tsFileResources = tsFileManager.getTsFileList(true,
startTime, endTime);
+ tsFileResources.addAll(tsFileManager.getTsFileList(false, startTime,
endTime));
+
tsFileResources.stream().filter(TsFileResource::isClosed).forEach(sealedResource::add);
+ tsFileResources.stream()
+ .filter(resource -> !resource.isClosed())
+ .forEach(unsealedResource::add);
}
/**
@@ -2083,7 +2077,7 @@ public class DataRegion implements IDataRegionForQuery {
List<TsFileResource> sealedTsFileResource = new ArrayList<>();
List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
- separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime,
endTime);
+ getTwoKindsOfTsFiles(sealedTsFileResource, unsealedTsFileResource,
startTime, endTime);
// deviceMatchInfo is used for filter the matched deviceId in
TsFileResource
// deviceMatchInfo contains the DeviceId means this device matched the
pattern
Set<String> deviceMatchInfo = new HashSet<>();
@@ -2126,7 +2120,7 @@ public class DataRegion implements IDataRegionForQuery {
}
List<TsFileResource> sealedTsFileResource = new ArrayList<>();
List<TsFileResource> unsealedTsFileResource = new ArrayList<>();
- separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime,
endTime);
+ getTwoKindsOfTsFiles(sealedTsFileResource, unsealedTsFileResource,
startTime, endTime);
deleteDataDirectlyInFile(unsealedTsFileResource, pathToDelete,
startTime, endTime);
writeUnlock();
releasedLock = true;
@@ -2161,6 +2155,17 @@ public class DataRegion implements IDataRegionForQuery {
walFlushListeners.add(walFlushListener);
}
}
+ // Some time the deletion operation doesn't have any related tsfile
processor or memtable,
+ // but it's still necessary to write to the WAL, so that iotconsensus can
synchronize the delete
+ // operation to other nodes.
+ if (walFlushListeners.isEmpty()) {
+ // TODO: IoTConsensusV2 deletion support
+ getWALNode()
+ .ifPresent(
+ walNode ->
+ walFlushListeners.add(
+ walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST,
deleteDataNode)));
+ }
return walFlushListeners;
}
@@ -3586,13 +3591,14 @@ public class DataRegion implements IDataRegionForQuery {
}
/** This method could only be used in iot consensus */
- public IWALNode getWALNode() {
+ public Optional<IWALNode> getWALNode() {
if
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
{
- throw new UnsupportedOperationException();
+ return Optional.empty();
}
// identifier should be same with getTsFileProcessor method
- return WALManager.getInstance()
- .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
+ return Optional.of(
+ WALManager.getInstance()
+ .applyForWALNode(databaseName + FILE_NAME_SEPARATOR +
dataRegionId));
}
/** Wait for this data region successfully deleted */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 02fdf59563c..78eb2309cf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -201,6 +201,8 @@ public class TsFileProcessor {
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
PerformanceOverviewMetrics.getInstance();
+ public static final int MEMTABLE_NOT_EXIST = -1;
+
@SuppressWarnings("squid:S107")
public TsFileProcessor(
String storageGroupName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index ac938a33f72..79d5d26673b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -128,12 +128,20 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
+ logger.debug(
+ "WAL node-{} logs insertRowNode, the search index is {}.",
+ identifier,
+ insertRowNode.getSearchIndex());
WALEntry walEntry = new WALInfoEntry(memTableId, insertRowNode);
return log(walEntry);
}
@Override
public WALFlushListener log(long memTableId, InsertRowsNode insertRowsNode) {
+ logger.debug(
+ "WAL node-{} logs insertRowsNode, the search index is {}.",
+ identifier,
+ insertRowsNode.getSearchIndex());
WALEntry walEntry = new WALInfoEntry(memTableId, insertRowsNode);
return log(walEntry);
}
@@ -141,12 +149,20 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(
long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
+ logger.debug(
+ "WAL node-{} logs insertTabletNode, the search index is {}.",
+ identifier,
+ insertTabletNode.getSearchIndex());
WALEntry walEntry = new WALInfoEntry(memTableId, insertTabletNode, start,
end);
return log(walEntry);
}
@Override
public WALFlushListener log(long memTableId, DeleteDataNode deleteDataNode) {
+ logger.debug(
+ "WAL node-{} logs deleteDataNode, the search index is {}.",
+ identifier,
+ deleteDataNode.getSearchIndex());
WALEntry walEntry = new WALInfoEntry(memTableId, deleteDataNode);
return log(walEntry);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 1419f9c6136..43140cb56cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -24,12 +24,10 @@ import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
-import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
@@ -83,7 +81,7 @@ public class WALNodeRecoverTask implements Runnable {
logger.info("Start recovering WAL node in the directory {}", logDirectory);
// recover version id and search index
- long[] indexInfo = recoverLastFile();
+ long[] indexInfo = readLastFileInfoAndRepairIt();
long lastVersionId = indexInfo[0];
long lastSearchIndex = indexInfo[1];
@@ -156,7 +154,7 @@ public class WALNodeRecoverTask implements Runnable {
}
}
- private long[] recoverLastFile() {
+ private long[] readLastFileInfoAndRepairIt() {
File[] walFiles = WALFileUtils.listAllWALFiles(logDirectory);
if (walFiles == null || walFiles.length == 0) {
return new long[] {0L, 0L};
@@ -173,20 +171,11 @@ public class WALNodeRecoverTask implements Runnable {
WALEntry walEntry = walReader.next();
long searchIndex = DEFAULT_SEARCH_INDEX;
if (walEntry.getType().needSearch()) {
- if (walEntry.getType() != WALEntryType.DELETE_DATA_NODE) {
- InsertNode insertNode = (InsertNode) walEntry.getValue();
- if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
- searchIndex = insertNode.getSearchIndex();
- lastSearchIndex = Math.max(lastSearchIndex,
insertNode.getSearchIndex());
- fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
- }
- } else {
- DeleteDataNode deleteNode = (DeleteDataNode) walEntry.getValue();
- if (deleteNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
- searchIndex = deleteNode.getSearchIndex();
- lastSearchIndex = Math.max(lastSearchIndex,
deleteNode.getSearchIndex());
- fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
- }
+ SearchNode searchNode = (SearchNode) walEntry.getValue();
+ if (searchNode.getSearchIndex() != SearchNode.NO_CONSENSUS_INDEX) {
+ searchIndex = searchNode.getSearchIndex();
+ lastSearchIndex = Math.max(lastSearchIndex,
searchNode.getSearchIndex());
+ fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
metaData.setTruncateOffSet(walReader.getWALCurrentReadOffset());
@@ -196,12 +185,7 @@ public class WALNodeRecoverTask implements Runnable {
logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
}
// make sure last wal file is correct
- WALRecoverWriter walRecoverWriter = new WALRecoverWriter(lastWALFile);
- try {
- walRecoverWriter.recover(metaData);
- } catch (IOException e) {
- logger.error("Fail to recover metadata of wal file {}", lastWALFile);
- }
+ repairWalFileIfBroken(lastWALFile, metaData);
// rename last wal file when file status are inconsistent
if (WALFileUtils.parseStatusCode(lastWALFile.getName()) != fileStatus) {
String targetName =
@@ -216,12 +200,20 @@ public class WALNodeRecoverTask implements Runnable {
return new long[] {lastVersionId, lastSearchIndex};
}
+ private static void repairWalFileIfBroken(File walFile, WALMetaData
metaData) {
+ WALRepairWriter walRepairWriter = new WALRepairWriter(walFile);
+ try {
+ walRepairWriter.repair(metaData);
+ } catch (IOException e) {
+ logger.error("Fail to recover metadata of wal file {}", walFile, e);
+ }
+ }
+
private void recoverInfoFromCheckpoints() {
// parse memTables information
CheckpointRecoverUtils.CheckpointInfo info =
CheckpointRecoverUtils.recoverMemTableInfo(logDirectory);
memTableId2Info = info.getMemTableId2Info();
- memTableId2RecoverPerformer = new HashMap<>();
// update init memTable id
long maxMemTableId = info.getMaxMemTableId();
AtomicLong memTableIdCounter = AbstractMemTable.memTableIdCounter;
@@ -232,6 +224,7 @@ public class WALNodeRecoverTask implements Runnable {
}
}
// update firstValidVersionId and get recover performer from
WALRecoverManager
+ memTableId2RecoverPerformer = new HashMap<>();
for (MemTableInfo memTableInfo : memTableId2Info.values()) {
firstValidVersionId = Math.min(firstValidVersionId,
memTableInfo.getFirstFileVersionId());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
similarity index 93%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
index 900a706006b..c60c4d1a809 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java
@@ -35,15 +35,15 @@ import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGI
import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2;
import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2_BYTES;
-/** Check whether the wal file is broken and recover it. */
-public class WALRecoverWriter {
+/** Check whether the wal file is broken and repair it. */
+public class WALRepairWriter {
private final File logFile;
- public WALRecoverWriter(File logFile) {
+ public WALRepairWriter(File logFile) {
this.logFile = logFile;
}
- public void recover(WALMetaData metaData) throws IOException {
+ public void repair(WALMetaData metaData) throws IOException {
// locate broken data
long truncateSize;
WALFileVersion version = WALFileVersion.getVersion(logFile);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
index 31fae84ad9a..8bd47bbd917 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManag
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -471,6 +472,8 @@ public class AbstractCompactionTest {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
+ WALManager.getInstance().clear();
+
EnvironmentUtils.cleanAllDir();
if (SEQ_DIRS.exists()) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
index d9d92034889..c2920db6050 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
@@ -55,6 +55,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -70,6 +72,9 @@ import static org.junit.Assert.assertEquals;
public class RewriteCrossSpaceCompactionWithFastPerformerTest extends
AbstractCompactionTest {
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(RewriteCrossSpaceCompactionWithFastPerformerTest.class);
+
private final String oldThreadName = Thread.currentThread().getName();
@Before
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
similarity index 94%
rename from
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
rename to
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
index 7d49c448354..aca54fe2f19 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
@@ -51,7 +51,7 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.HashSet;
-public class WALRecoverWriterTest {
+public class WALRepairWriterTest {
private final File logFile =
new File(
TestConstant.BASE_OUTPUT_PATH.concat(
@@ -68,9 +68,8 @@ public class WALRecoverWriterTest {
logFile.createNewFile();
long firstSearchIndex =
WALFileUtils.parseStartSearchIndex(logFile.getName());
WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>(), new HashSet<>());
- // recover
- WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
- walRecoverWriter.recover(walMetaData);
+ // repair
+ new WALRepairWriter(logFile).repair(walMetaData);
// verify file, marker + metadata(search index + size number) + metadata
size + head magic
// string + tail magic string
Assert.assertEquals(
@@ -94,9 +93,8 @@ public class WALRecoverWriterTest {
}
long firstSearchIndex =
WALFileUtils.parseStartSearchIndex(logFile.getName());
WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>(), new HashSet<>());
- // recover
- WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
- walRecoverWriter.recover(walMetaData);
+ // repair
+ new WALRepairWriter(logFile).repair(walMetaData);
// verify file, marker + metadata(search index + size number) + metadata
size + magic string
Assert.assertEquals(
Byte.BYTES
@@ -122,9 +120,8 @@ public class WALRecoverWriterTest {
try (WALWriter walWriter = new WALWriter(logFile)) {
walWriter.write(buffer.getBuffer(), walMetaData);
}
- // recover
- WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
- walRecoverWriter.recover(walMetaData);
+ // repair
+ new WALRepairWriter(logFile).repair(walMetaData);
// verify file
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertTrue(reader.hasNext());
@@ -146,9 +143,8 @@ public class WALRecoverWriterTest {
try (WALWriter walWriter = new WALWriter(logFile)) {
walWriter.write(buffer.getBuffer(), walMetaData);
}
- // recover
- WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
- walRecoverWriter.recover(walMetaData);
+ // repair
+ new WALRepairWriter(logFile).repair(walMetaData);
// verify file
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertTrue(reader.hasNext());
@@ -175,9 +171,8 @@ public class WALRecoverWriterTest {
try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.APPEND)) {
channel.truncate(len - 1);
}
- // recover
- WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
- walRecoverWriter.recover(walMetaData);
+ // repair
+ new WALRepairWriter(logFile).repair(walMetaData);
// verify file
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertTrue(reader.hasNext());