This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b07fafe3954 Optimized wal file deletion algorithm (#11948)
b07fafe3954 is described below
commit b07fafe395442c90eb981585619a233ce42b373b
Author: Alan Choo <[email protected]>
AuthorDate: Tue Jan 23 20:04:13 2024 +0800
Optimized wal file deletion algorithm (#11948)
Co-authored-by: Zhijia Cao <[email protected]>
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +-
.../db/storageengine/dataregion/DataRegion.java | 6 +-
.../dataregion/memtable/AbstractMemTable.java | 18 +
.../dataregion/memtable/IMemTable.java | 2 +
.../dataregion/memtable/TsFileProcessor.java | 5 +
.../dataregion/wal/buffer/WALBuffer.java | 57 +-
.../wal/checkpoint/CheckpointManager.java | 50 +-
.../dataregion/wal/checkpoint/MemTableInfo.java | 22 +-
.../dataregion/wal/io/WALByteBufReader.java | 26 +-
.../dataregion/wal/io/WALMetaData.java | 76 ++-
.../storageengine/dataregion/wal/node/WALNode.java | 198 +++----
.../dataregion/wal/recover/WALNodeRecoverTask.java | 41 +-
.../storageengine/dataregion/DataRegionTest.java | 4 +
.../dataregion/wal/node/WALEntryHandlerTest.java | 13 +-
.../wal/node/WalDeleteOutdatedNewTest.java | 585 +++++++++++++++++++++
.../wal/recover/WALRecoverWriterTest.java | 9 +-
.../resources/conf/iotdb-common.properties | 20 +-
17 files changed, 944 insertions(+), 200 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3b6bac840a8..26659d87f80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -390,25 +390,25 @@ public class IoTDBConfig {
private boolean enableTimedFlushSeqMemtable = true;
/**
- * If a memTable's created time is older than current time minus this, the
memtable will be
+ * If a memTable's last update time is older than current time minus this,
the memtable will be
* flushed to disk.(only check sequence tsfiles' memtables) Unit: ms
*/
- private long seqMemtableFlushInterval = 3 * 60 * 60 * 1000L;
+ private long seqMemtableFlushInterval = 10 * 60 * 1000L;
/** The interval to check whether sequence memtables need flushing. Unit: ms
*/
- private long seqMemtableFlushCheckInterval = 10 * 60 * 1000L;
+ private long seqMemtableFlushCheckInterval = 30 * 1000L;
/** Whether to timed flush unsequence tsfiles' memtables. */
private boolean enableTimedFlushUnseqMemtable = true;
/**
- * If a memTable's created time is older than current time minus this, the
memtable will be
+ * If a memTable's last update time is older than current time minus this,
the memtable will be
* flushed to disk.(only check unsequence tsfiles' memtables) Unit: ms
*/
- private long unseqMemtableFlushInterval = 3 * 60 * 60 * 1000L;
+ private long unseqMemtableFlushInterval = 10 * 60 * 1000L;
/** The interval to check whether unsequence memtables need flushing. Unit:
ms */
- private long unseqMemtableFlushCheckInterval = 10 * 60 * 1000L;
+ private long unseqMemtableFlushCheckInterval = 30 * 1000L;
/** The sort algorithm used in TVList */
private TVListSortAlgorithm tvListSortAlgorithm = TVListSortAlgorithm.TIM;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index e74c99c762c..8ad98d45eae 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -406,7 +406,7 @@ public class DataRegion implements IDataRegionForQuery {
if (lastLogTime + config.getRecoveryLogIntervalInMs() <
System.currentTimeMillis()) {
logger.info(
"The data region {}[{}] has recovered {}%, please wait a
moment.",
- databaseName, dataRegionId, recoveredFilesNum * 1.0 /
numOfFilesToRecover);
+ databaseName, dataRegionId, recoveredFilesNum * 100.0 /
numOfFilesToRecover);
lastLogTime = System.currentTimeMillis();
}
}
@@ -1680,7 +1680,7 @@ public class DataRegion implements IDataRegionForQuery {
new ArrayList<>(workSequenceTsFileProcessors.values());
long timeLowerBound = System.currentTimeMillis() -
config.getSeqMemtableFlushInterval();
for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
- if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+ if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
logger.info(
"Exceed sequence memtable flush interval, so flush working
memtable of time partition {} in database {}[{}]",
tsFileProcessor.getTimeRangeId(),
@@ -1706,7 +1706,7 @@ public class DataRegion implements IDataRegionForQuery {
long timeLowerBound = System.currentTimeMillis() -
config.getUnseqMemtableFlushInterval();
for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
- if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+ if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) {
logger.info(
"Exceed unsequence memtable flush interval, so flush working
memtable of time partition {} in database {}[{}]",
tsFileProcessor.getTimeRangeId(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 1488659e6cf..0b2ff4a04cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -98,6 +98,14 @@ public abstract class AbstractMemTable implements IMemTable {
private final long createdTime = System.currentTimeMillis();
+ /** this time is updated by the timed flush, same as createdTime when the
feature is disabled. */
+ private long updateTime = createdTime;
+ /**
+ * check whether this memTable has been updated since last timed flush
check, update updateTime
+ * when changed
+ */
+ private long lastTotalPointsNum = totalPointsNum;
+
private String database;
private String dataRegionId;
@@ -589,6 +597,16 @@ public abstract class AbstractMemTable implements
IMemTable {
return createdTime;
}
+ /** Check whether updated since last get method */
+ @Override
+ public long getUpdateTime() {
+ if (lastTotalPointsNum != totalPointsNum) {
+ lastTotalPointsNum = totalPointsNum;
+ updateTime = System.currentTimeMillis();
+ }
+ return updateTime;
+ }
+
@Override
public FlushStatus getFlushStatus() {
return flushStatus;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 4efcbdf6f8c..8e1a883d2a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -168,6 +168,8 @@ public interface IMemTable extends WALEntryValue {
long getCreatedTime();
+ long getUpdateTime();
+
FlushStatus getFlushStatus();
void setFlushStatus(FlushStatus flushStatus);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 4cd143ad5fb..5bbf9ac7fe3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1576,6 +1576,11 @@ public class TsFileProcessor {
return workMemTable != null ? workMemTable.getCreatedTime() :
Long.MAX_VALUE;
}
+ /** Return Long.MAX_VALUE if workMemTable is null */
+ public long getWorkMemTableUpdateTime() {
+ return workMemTable != null ? workMemTable.getUpdateTime() :
Long.MAX_VALUE;
+ }
+
public long getLastWorkMemtableFlushTime() {
return lastWorkMemtableFlushTime;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index dea4a55c4ab..d6deb613e6f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointMan
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALNodeClosedException;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.utils.MmapUtil;
@@ -40,14 +41,23 @@ import org.apache.iotdb.db.utils.MmapUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -102,6 +112,9 @@ public class WALBuffer extends AbstractWALBuffer {
// single thread to sync syncingBuffer to disk
private final ExecutorService syncBufferThread;
+ // manage wal files which have MemTableIds
+ private final Map<Long, Set<Long>> memTableIdsOfWal = new
ConcurrentHashMap<>();
+
public WALBuffer(String identifier, String logDirectory) throws
FileNotFoundException {
this(identifier, logDirectory, new CheckpointManager(identifier,
logDirectory), 0, 0L);
}
@@ -183,6 +196,7 @@ public class WALBuffer extends AbstractWALBuffer {
/** This info class traverses some extra info from serializeThread to
syncBufferThread. */
private static class SerializeInfo {
final WALMetaData metaData = new WALMetaData();
+ final Map<Long, Long> memTableId2WalDiskUsage = new HashMap<>();
final List<Checkpoint> checkpoints = new ArrayList<>();
final List<WALFlushListener> fsyncListeners = new ArrayList<>();
WALFlushListener rollWALFileWriterListener = null;
@@ -279,10 +293,11 @@ public class WALBuffer extends AbstractWALBuffer {
return;
}
- int size = byteBufferView.position();
+ int startPosition = byteBufferView.position();
+ int size;
try {
walEntry.serialize(byteBufferView);
- size = byteBufferView.position() - size;
+ size = byteBufferView.position() - startPosition;
} catch (Exception e) {
logger.error(
"Fail to serialize WALEntry to wal node-{}'s buffer, discard it.",
identifier, e);
@@ -304,7 +319,9 @@ public class WALBuffer extends AbstractWALBuffer {
}
// update related info
totalSize += size;
- info.metaData.add(size, searchIndex);
+ info.metaData.add(size, searchIndex, walEntry.getMemTableId());
+ info.memTableId2WalDiskUsage.compute(
+ walEntry.getMemTableId(), (k, v) -> v == null ? size : v + size);
walEntry.getWalFlushListener().getWalEntryHandler().setSize(size);
info.fsyncListeners.add(walEntry.getWalFlushListener());
}
@@ -509,6 +526,12 @@ public class WALBuffer extends AbstractWALBuffer {
switchSyncingBufferToIdle();
}
+ // update info
+ memTableIdsOfWal
+ .computeIfAbsent(currentWALFileVersion, memTableIds -> new
HashSet<>())
+ .addAll(info.metaData.getMemTablesId());
+
checkpointManager.updateCostOfActiveMemTables(info.memTableId2WalDiskUsage);
+
boolean forceSuccess = false;
// try to roll log writer
if (info.rollWALFileWriterListener != null
@@ -682,4 +705,32 @@ public class WALBuffer extends AbstractWALBuffer {
public CheckpointManager getCheckpointManager() {
return checkpointManager;
}
+
+ public void removeMemTableIdsOfWal(Long walVersionId) {
+ this.memTableIdsOfWal.remove(walVersionId);
+ }
+
+ public Set<Long> getMemTableIds(long fileVersionId) {
+ if (fileVersionId >= currentWALFileVersion) {
+ return Collections.emptySet();
+ }
+ return memTableIdsOfWal.computeIfAbsent(
+ fileVersionId,
+ id -> {
+ try {
+ File file = WALFileUtils.getWALFile(new File(logDirectory), id);
+ return WALMetaData.readFromWALFile(
+ file, FileChannel.open(file.toPath(),
StandardOpenOption.READ))
+ .getMemTablesId();
+ } catch (IOException e) {
+ logger.error("Fail to read memTable ids from the wal file {}.",
id);
+ return new HashSet<>();
+ }
+ });
+ }
+
+ @TestOnly
+ public Map<Long, Set<Long>> getMemTableIdsOfWal() {
+ return memTableIdsOfWal;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
index 50ac8ca6155..7d7ff0f76b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/CheckpointManager.java
@@ -40,9 +40,10 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -61,7 +62,7 @@ public class CheckpointManager implements AutoCloseable {
private final Lock infoLock = new ReentrantLock();
// region these variables should be protected by infoLock
// memTable id -> memTable info
- private final Map<Long, MemTableInfo> memTableId2Info = new HashMap<>();
+ private final Map<Long, MemTableInfo> memTableId2Info = new
ConcurrentHashMap<>();
// cache the biggest byte buffer to serialize checkpoint
// it's safe to use volatile here to make this reference thread-safe.
@SuppressWarnings("squid:S3077")
@@ -88,7 +89,7 @@ public class CheckpointManager implements AutoCloseable {
logHeader();
}
- public List<MemTableInfo> snapshotMemTableInfos() {
+ public List<MemTableInfo> activeOrPinnedMemTables() {
infoLock.lock();
try {
return new ArrayList<>(memTableId2Info.values());
@@ -121,7 +122,7 @@ public class CheckpointManager implements AutoCloseable {
*/
private void makeGlobalInfoCP() {
long start = System.nanoTime();
- List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+ List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
memTableInfos.removeIf(MemTableInfo::isFlushed);
Checkpoint checkpoint = new
Checkpoint(CheckpointType.GLOBAL_MEMORY_TABLE_INFO, memTableInfos);
logByCachedByteBuffer(checkpoint);
@@ -315,20 +316,13 @@ public class CheckpointManager implements AutoCloseable {
}
// endregion
- /** Get MemTableInfo of oldest MemTable, whose first version id is smallest.
*/
- public MemTableInfo getOldestMemTableInfo() {
+ /** Get MemTableInfo of oldest unpinned MemTable, whose first version id is
smallest. */
+ public MemTableInfo getOldestUnpinnedMemTableInfo() {
// find oldest memTable
- List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
- if (memTableInfos.isEmpty()) {
- return null;
- }
- MemTableInfo oldestMemTableInfo = memTableInfos.get(0);
- for (MemTableInfo memTableInfo : memTableInfos) {
- if (oldestMemTableInfo.getFirstFileVersionId() >
memTableInfo.getFirstFileVersionId()) {
- oldestMemTableInfo = memTableInfo;
- }
- }
- return oldestMemTableInfo;
+ return activeOrPinnedMemTables().stream()
+ .filter(memTableInfo -> !memTableInfo.isPinned())
+ .min(Comparator.comparingLong(MemTableInfo::getMemTableId))
+ .orElse(null);
}
/**
@@ -337,7 +331,7 @@ public class CheckpointManager implements AutoCloseable {
* @return Return {@link Long#MIN_VALUE} if no file is valid
*/
public long getFirstValidWALVersionId() {
- List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+ List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
long firstValidVersionId = memTableInfos.isEmpty() ? Long.MIN_VALUE :
Long.MAX_VALUE;
for (MemTableInfo memTableInfo : memTableInfos) {
firstValidVersionId = Math.min(firstValidVersionId,
memTableInfo.getFirstFileVersionId());
@@ -345,20 +339,28 @@ public class CheckpointManager implements AutoCloseable {
return firstValidVersionId;
}
+ /** Update wal disk cost of active memTables. */
+ public void updateCostOfActiveMemTables(Map<Long, Long>
memTableId2WalDiskUsage) {
+ for (Map.Entry<Long, Long> memTableWalUsage :
memTableId2WalDiskUsage.entrySet()) {
+ memTableId2Info.computeIfPresent(
+ memTableWalUsage.getKey(),
+ (k, v) -> {
+ v.addWalDiskUsage(memTableWalUsage.getValue());
+ return v;
+ });
+ }
+ }
+
/** Get total cost of active memTables. */
public long getTotalCostOfActiveMemTables() {
- List<MemTableInfo> memTableInfos = snapshotMemTableInfos();
+ List<MemTableInfo> memTableInfos = activeOrPinnedMemTables();
long totalCost = 0;
for (MemTableInfo memTableInfo : memTableInfos) {
// flushed memTables are not active
if (memTableInfo.isFlushed()) {
continue;
}
- if (config.isEnableMemControl()) {
- totalCost += memTableInfo.getMemTable().getTVListsRamCost();
- } else {
- totalCost++;
- }
+ totalCost += memTableInfo.getWalDiskUsage();
}
return totalCost;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
index 109652d8786..b9744d444ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/checkpoint/MemTableInfo.java
@@ -37,6 +37,12 @@ import java.util.Objects;
public class MemTableInfo implements WALEntryValue {
// memTable id 8 bytes, first version id 8 bytes
private static final int FIXED_SERIALIZED_SIZE = Long.BYTES * 2;
+ // memTable id
+ private long memTableId;
+ // path of the tsFile which this memTable will be flushed to
+ private String tsFilePath;
+ // version id of the file where this memTable's first WALEntry is located
+ private volatile long firstFileVersionId;
// memTable
private IMemTable memTable;
@@ -44,14 +50,10 @@ public class MemTableInfo implements WALEntryValue {
private int pinCount;
// memTable is flushed or not
private boolean flushed;
- // memTable id
- private long memTableId;
// data region id
private int dataRegionId;
- // path of the tsFile which this memTable will be flushed to
- private String tsFilePath;
- // version id of the file where this memTable's first WALEntry is located
- private volatile long firstFileVersionId;
+ // total wal usage of this memTable
+ private long walDiskUsage;
private MemTableInfo() {}
@@ -158,4 +160,12 @@ public class MemTableInfo implements WALEntryValue {
public void setFirstFileVersionId(long firstFileVersionId) {
this.firstFileVersionId = firstFileVersionId;
}
+
+ public long getWalDiskUsage() {
+ return walDiskUsage;
+ }
+
+ public void addWalDiskUsage(long size) {
+ walDiskUsage += size;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index a57d7faf750..f101eaf3647 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -46,23 +46,8 @@ public class WALByteBufReader implements Closeable {
public WALByteBufReader(File logFile, FileChannel channel) throws
IOException {
this.logFile = logFile;
this.channel = channel;
- if (channel.size() < WALWriter.MAGIC_STRING_BYTES
- || !readTailMagic().equals(WALWriter.MAGIC_STRING)) {
- throw new IOException(String.format("Broken wal file %s", logFile));
- }
- // load metadata size
- ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
- long position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
- channel.read(metadataSizeBuf, position);
- metadataSizeBuf.flip();
- // load metadata
- int metadataSize = metadataSizeBuf.getInt();
- ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
- channel.read(metadataBuf, position - metadataSize);
- metadataBuf.flip();
- metaData = WALMetaData.deserialize(metadataBuf);
- // init iterator
- sizeIterator = metaData.getBuffersSize().iterator();
+ this.metaData = WALMetaData.readFromWALFile(logFile, channel);
+ this.sizeIterator = metaData.getBuffersSize().iterator();
channel.position(0);
}
@@ -84,11 +69,8 @@ public class WALByteBufReader implements Closeable {
return buffer;
}
- private String readTailMagic() throws IOException {
- ByteBuffer magicStringBytes =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
- channel.read(magicStringBytes, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
- magicStringBytes.flip();
- return new String(magicStringBytes.array());
+ public WALMetaData getMetaData() {
+ return metaData;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index f9b97ece3e5..b8cb4dfef13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -22,9 +22,14 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.utils.SerializedSize;
+import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* Metadata exists at the end of each wal file, including each entry's size,
search index of first
@@ -38,21 +43,25 @@ public class WALMetaData implements SerializedSize {
private long firstSearchIndex;
// each entry's size
private final List<Integer> buffersSize;
+ // memTable ids of this wal file
+ private final Set<Long> memTablesId;
public WALMetaData() {
- this(ConsensusReqReader.DEFAULT_SEARCH_INDEX, new ArrayList<>());
+ this(ConsensusReqReader.DEFAULT_SEARCH_INDEX, new ArrayList<>(), new
HashSet<>());
}
- public WALMetaData(long firstSearchIndex, List<Integer> buffersSize) {
+ public WALMetaData(long firstSearchIndex, List<Integer> buffersSize,
Set<Long> memTablesId) {
this.firstSearchIndex = firstSearchIndex;
this.buffersSize = buffersSize;
+ this.memTablesId = memTablesId;
}
- public void add(int size, long searchIndex) {
+ public void add(int size, long searchIndex, long memTableId) {
if (buffersSize.isEmpty()) {
firstSearchIndex = searchIndex;
}
buffersSize.add(size);
+ memTablesId.add(memTableId);
}
public void addAll(WALMetaData metaData) {
@@ -60,11 +69,14 @@ public class WALMetaData implements SerializedSize {
firstSearchIndex = metaData.getFirstSearchIndex();
}
buffersSize.addAll(metaData.getBuffersSize());
+ memTablesId.addAll(metaData.getMemTablesId());
}
@Override
public int serializedSize() {
- return FIXED_SERIALIZED_SIZE + buffersSize.size() * Integer.BYTES;
+ return FIXED_SERIALIZED_SIZE
+ + buffersSize.size() * Integer.BYTES
+ + (memTablesId.isEmpty() ? 0 : Integer.BYTES + memTablesId.size() *
Long.BYTES);
}
public void serialize(ByteBuffer buffer) {
@@ -73,6 +85,12 @@ public class WALMetaData implements SerializedSize {
for (int size : buffersSize) {
buffer.putInt(size);
}
+ if (!memTablesId.isEmpty()) {
+ buffer.putInt(memTablesId.size());
+ for (long memTableId : memTablesId) {
+ buffer.putLong(memTableId);
+ }
+ }
}
public static WALMetaData deserialize(ByteBuffer buffer) {
@@ -82,14 +100,62 @@ public class WALMetaData implements SerializedSize {
for (int i = 0; i < entriesNum; ++i) {
buffersSize.add(buffer.getInt());
}
- return new WALMetaData(firstSearchIndex, buffersSize);
+ Set<Long> memTablesId = new HashSet<>();
+ if (buffer.hasRemaining()) {
+ int memTablesIdNum = buffer.getInt();
+ for (int i = 0; i < memTablesIdNum; ++i) {
+ memTablesId.add(buffer.getLong());
+ }
+ }
+ return new WALMetaData(firstSearchIndex, buffersSize, memTablesId);
}
public List<Integer> getBuffersSize() {
return buffersSize;
}
+ public Set<Long> getMemTablesId() {
+ return memTablesId;
+ }
+
public long getFirstSearchIndex() {
return firstSearchIndex;
}
+
+ public static WALMetaData readFromWALFile(File logFile, FileChannel channel)
throws IOException {
+ if (channel.size() < WALWriter.MAGIC_STRING_BYTES
+ || !readTailMagic(channel).equals(WALWriter.MAGIC_STRING)) {
+ throw new IOException(String.format("Broken wal file %s", logFile));
+ }
+ // load metadata size
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ channel.read(metadataSizeBuf, position);
+ metadataSizeBuf.flip();
+ // load metadata
+ int metadataSize = metadataSizeBuf.getInt();
+ ByteBuffer metadataBuf = ByteBuffer.allocate(metadataSize);
+ channel.read(metadataBuf, position - metadataSize);
+ metadataBuf.flip();
+ WALMetaData metaData = WALMetaData.deserialize(metadataBuf);
+ if (metaData.memTablesId.isEmpty()) {
+ int offset = Byte.BYTES;
+ for (int size : metaData.buffersSize) {
+ channel.position(offset);
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ channel.read(buffer);
+ buffer.clear();
+ metaData.memTablesId.add(buffer.getLong());
+ offset += size;
+ }
+ }
+ return metaData;
+ }
+
+ private static String readTailMagic(FileChannel channel) throws IOException {
+ ByteBuffer magicStringBytes =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
+ channel.read(magicStringBytes, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ magicStringBytes.flip();
+ return new String(magicStringBytes.array());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 9d117c6dccf..f59bdf8cc2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.node;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -66,18 +65,18 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}.
If search is enabled,
@@ -104,11 +103,6 @@ public class WALNode implements IWALNode {
// memTable id -> memTable snapshot count
// used to avoid write amplification caused by frequent snapshot
private final Map<Long, Integer> memTableSnapshotCount = new
ConcurrentHashMap<>();
- // total cost of flushedMemTables
- // when memControl enabled, cost is memTable ram cost, otherwise cost is
memTable count
- private final AtomicLong totalCostOfFlushedMemTables = new AtomicLong();
- // version id -> cost sum of memTables flushed at this file version
- private final Map<Long, Long> walFileVersionId2MemTablesTotalCost = new
ConcurrentHashMap<>();
// insert nodes whose search index are before this value can be deleted
safely
private volatile long safelyDeletedSearchIndex =
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
@@ -150,6 +144,7 @@ public class WALNode implements IWALNode {
}
private WALFlushListener log(WALEntry walEntry) {
+
buffer.write(walEntry);
// set handler for pipe
walEntry.getWalFlushListener().getWalEntryHandler().setWalNode(this,
walEntry.getMemTableId());
@@ -174,12 +169,6 @@ public class WALNode implements IWALNode {
// remove snapshot info
memTableSnapshotCount.remove(memTable.getMemTableId());
- // update cost info
- long cost = config.isEnableMemControl() ? memTable.getTVListsRamCost() : 1;
- long currentWALFileVersion = buffer.getCurrentWALFileVersion();
- walFileVersionId2MemTablesTotalCost.compute(
- currentWALFileVersion, (k, v) -> v == null ? cost : v + cost);
- totalCostOfFlushedMemTables.addAndGet(cost);
}
@Override
@@ -231,16 +220,17 @@ public class WALNode implements IWALNode {
}
private class DeleteOutdatedFileTask implements Runnable {
+ private File[] sortedWalFilesExcludingLast;
+
+ private List<MemTableInfo> activeOrPinnedMemTables;
+
private static final int MAX_RECURSION_TIME = 5;
- // .wal files whose version ids are less than first valid version id
should be deleted
- private long firstValidVersionId;
+
// the effective information ratio
private double effectiveInfoRatio = 0d;
private List<Long> pinnedMemTableIds;
- private File[] filesShouldDelete;
-
private int fileIndexAfterFilterSafelyDeleteIndex = Integer.MAX_VALUE;
private List<Long> successfullyDeleted;
private long deleteFileSize;
@@ -251,21 +241,44 @@ public class WALNode implements IWALNode {
// Do nothing
}
- private void init() {
- this.firstValidVersionId = initFirstValidWALVersionId();
- this.filesShouldDelete =
logDirectory.listFiles(this::filterFilesToDelete);
- if (filesShouldDelete == null) {
- filesShouldDelete = new File[0];
+ private boolean initAndCheckIfNeedContinue() {
+ rollWalFileIfHaveNoActiveMemTable();
+ File[] allWalFilesOfOneNode = WALFileUtils.listAllWALFiles(logDirectory);
+ if (allWalFilesOfOneNode == null || allWalFilesOfOneNode.length <= 1) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "wal node-{}:no wal file or wal file number less than or equal
to one was found",
+ identifier);
+ }
+ return false;
}
+ WALFileUtils.ascSortByVersionId(allWalFilesOfOneNode);
+ this.sortedWalFilesExcludingLast =
+ Arrays.copyOfRange(allWalFilesOfOneNode, 0,
allWalFilesOfOneNode.length - 1);
+ this.activeOrPinnedMemTables =
checkpointManager.activeOrPinnedMemTables();
this.pinnedMemTableIds = initPinnedMemTableIds();
- WALFileUtils.ascSortByVersionId(filesShouldDelete);
this.fileIndexAfterFilterSafelyDeleteIndex =
initFileIndexAfterFilterSafelyDeleteIndex();
this.successfullyDeleted = new ArrayList<>();
this.deleteFileSize = 0;
+ return true;
+ }
+
+ /**
+ * This means that the relevant memTable in the file has been successfully
flushed, so we should
+ * scroll through a new wal file so that the current file can be deleted
+ */
+ public void rollWalFileIfHaveNoActiveMemTable() {
+ long firstVersionId = checkpointManager.getFirstValidWALVersionId();
+ if (firstVersionId == Long.MIN_VALUE) {
+ // roll wal log writer to delete current wal file
+ if (buffer.getCurrentWALFileSize() > 0) {
+ rollWALFile();
+ }
+ }
}
private List<Long> initPinnedMemTableIds() {
- List<MemTableInfo> memTableInfos =
checkpointManager.snapshotMemTableInfos();
+ List<MemTableInfo> memTableInfos =
checkpointManager.activeOrPinnedMemTables();
if (memTableInfos.isEmpty()) {
return new ArrayList<>();
}
@@ -283,8 +296,11 @@ public class WALNode implements IWALNode {
// The intent of the loop execution here is to try to get as many
memTable flush or snapshot
// as possible when the valid information ratio is less than the
configured value.
while (recursionTime < MAX_RECURSION_TIME) {
- // init delete outdated file task fields
- init();
+ // init delete outdated file task fields, if the number of wal files
is less than one, the
+ // subsequent logic is not executed
+ if (!initAndCheckIfNeedContinue()) {
+ break;
+ }
// delete outdated WAL files and record which delete successfully and
which delete failed.
deleteOutdatedFilesAndUpdateMetric();
@@ -308,42 +324,36 @@ public class WALNode implements IWALNode {
private void updateEffectiveInfoRationAndUpdateMetric() {
// calculate effective information ratio
long costOfActiveMemTables =
checkpointManager.getTotalCostOfActiveMemTables();
- long costOfFlushedMemTables = totalCostOfFlushedMemTables.get();
- long totalCost = costOfActiveMemTables + costOfFlushedMemTables;
+ long totalCost =
+ (getCurrentWALFileVersion()
+ -
checkpointManager.getOldestUnpinnedMemTableInfo().getFirstFileVersionId()
+ + 1)
+ * config.getWalFileSizeThresholdInByte();
if (totalCost == 0) {
return;
}
effectiveInfoRatio = (double) costOfActiveMemTables / totalCost;
logger.debug(
- "Effective information ratio is {}, active memTables cost is {},
flushed memTables cost is {}",
+ "Effective information ratio is {}, active memTables cost is {},
total cost is {}",
effectiveInfoRatio,
costOfActiveMemTables,
- costOfFlushedMemTables);
+ totalCost);
WRITING_METRICS.recordWALNodeEffectiveInfoRatio(identifier,
effectiveInfoRatio);
}
private void summarizeExecuteResult() {
- if (filesShouldDelete.length == 0) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "wal node-{}:no wal file was found that should be deleted,
current first valid version id is {}",
- identifier,
- firstValidVersionId);
- }
- return;
- }
-
if (!pinnedMemTableIds.isEmpty()
- || fileIndexAfterFilterSafelyDeleteIndex < filesShouldDelete.length)
{
+ || fileIndexAfterFilterSafelyDeleteIndex <
sortedWalFilesExcludingLast.length) {
if (logger.isDebugEnabled()) {
StringBuilder summary =
new StringBuilder(
String.format(
- "wal node-%s delete outdated files summary:the range
that should be removed is: [%d,%d], delete successful is [%s], end file index
is: [%s].The following reasons influenced the result: %s",
+ "wal node-%s delete outdated files summary:the range is:
[%d,%d], delete successful is [%s], safely delete file index is: [%s].The
following reasons influenced the result: %s",
identifier,
-
WALFileUtils.parseVersionId(filesShouldDelete[0].getName()),
+
WALFileUtils.parseVersionId(sortedWalFilesExcludingLast[0].getName()),
WALFileUtils.parseVersionId(
- filesShouldDelete[filesShouldDelete.length -
1].getName()),
+
sortedWalFilesExcludingLast[sortedWalFilesExcludingLast.length - 1]
+ .getName()),
StringUtils.join(successfullyDeleted, ","),
fileIndexAfterFilterSafelyDeleteIndex,
System.getProperty("line.separator")));
@@ -355,7 +365,7 @@ public class WALNode implements IWALNode {
.append(".")
.append(System.getProperty("line.separator"));
}
- if (fileIndexAfterFilterSafelyDeleteIndex <
filesShouldDelete.length) {
+ if (fileIndexAfterFilterSafelyDeleteIndex <
sortedWalFilesExcludingLast.length) {
summary.append(
String.format(
"- The data in the wal file was not consumed by the
consensus group,current search index is %d, safely delete index is %d",
@@ -367,33 +377,28 @@ public class WALNode implements IWALNode {
} else {
logger.debug(
- "Successfully delete {} outdated wal files for wal node-{},first
valid version id is {}",
+ "Successfully delete {} outdated wal files for wal node-{}",
successfullyDeleted.size(),
- identifier,
- firstValidVersionId);
+ identifier);
}
}
/** Delete obsolete wal files while recording which succeeded or failed */
private void deleteOutdatedFilesAndUpdateMetric() {
- if (filesShouldDelete.length == 0) {
- return;
- }
- for (int i = 0; i < fileIndexAfterFilterSafelyDeleteIndex; ++i) {
- long fileSize = filesShouldDelete[i].length();
- long versionId =
WALFileUtils.parseVersionId(filesShouldDelete[i].getName());
- if (filesShouldDelete[i].delete()) {
- deleteFileSize += fileSize;
- Long memTableRamCostSum =
walFileVersionId2MemTablesTotalCost.remove(versionId);
- if (memTableRamCostSum != null) {
- totalCostOfFlushedMemTables.addAndGet(-memTableRamCostSum);
+ for (File currentWal : sortedWalFilesExcludingLast) {
+ long searchIndex =
WALFileUtils.parseStartSearchIndex(currentWal.getName());
+ WALFileStatus walFileStatus =
WALFileUtils.parseStatusCode(currentWal.getName());
+ long versionId = WALFileUtils.parseVersionId(currentWal.getName());
+ if (canDeleteFile(searchIndex, walFileStatus, versionId)) {
+ long fileSize = currentWal.length();
+ if (currentWal.delete()) {
+ deleteFileSize += fileSize;
+ buffer.removeMemTableIdsOfWal(versionId);
+ successfullyDeleted.add(versionId);
+ } else {
+ logger.info(
+ "Fail to delete outdated wal file {} of wal node-{}.",
currentWal, identifier);
}
- successfullyDeleted.add(versionId);
- } else {
- logger.info(
- "Fail to delete outdated wal file {} of wal node-{}.",
- filesShouldDelete[i],
- identifier);
}
}
buffer.subtractDiskUsage(deleteFileSize);
@@ -403,15 +408,15 @@ public class WALNode implements IWALNode {
private int initFileIndexAfterFilterSafelyDeleteIndex() {
int endFileIndex =
safelyDeletedSearchIndex == DEFAULT_SAFELY_DELETED_SEARCH_INDEX
- ? filesShouldDelete.length
+ ? sortedWalFilesExcludingLast.length
: WALFileUtils.binarySearchFileBySearchIndex(
- filesShouldDelete, safelyDeletedSearchIndex + 1);
+ sortedWalFilesExcludingLast, safelyDeletedSearchIndex + 1);
// delete files whose file status is CONTAINS_NONE_SEARCH_INDEX
if (endFileIndex == -1) {
endFileIndex = 0;
}
- while (endFileIndex < filesShouldDelete.length) {
- if
(WALFileUtils.parseStatusCode(filesShouldDelete[endFileIndex].getName())
+ while (endFileIndex < sortedWalFilesExcludingLast.length) {
+ if
(WALFileUtils.parseStatusCode(sortedWalFilesExcludingLast[endFileIndex].getName())
== WALFileStatus.CONTAINS_SEARCH_INDEX) {
break;
}
@@ -420,17 +425,6 @@ public class WALNode implements IWALNode {
return endFileIndex;
}
- private boolean filterFilesToDelete(File dir, String name) {
- Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
- Matcher matcher = pattern.matcher(name);
- boolean toDelete = false;
- if (matcher.find()) {
- long versionId =
Long.parseLong(matcher.group(IoTDBConstant.WAL_VERSION_ID));
- toDelete = versionId < firstValidVersionId;
- }
- return toDelete;
- }
-
/** Return true iff effective information ratio is too small or disk usage
is too large. */
private boolean shouldSnapshotOrFlush() {
return effectiveInfoRatio < config.getWalMinEffectiveInfoRatio()
@@ -447,7 +441,7 @@ public class WALNode implements IWALNode {
return false;
}
// find oldest memTable
- MemTableInfo oldestMemTableInfo =
checkpointManager.getOldestMemTableInfo();
+ MemTableInfo oldestMemTableInfo =
checkpointManager.getOldestUnpinnedMemTableInfo();
if (oldestMemTableInfo == null) {
return false;
}
@@ -584,22 +578,25 @@ public class WALNode implements IWALNode {
}
}
- public long initFirstValidWALVersionId() {
- long firstVersionId = checkpointManager.getFirstValidWALVersionId();
- // This means that the relevant memTable in the file has been
successfully flushed, so we
- // should scroll through a new wal file so that the current file can be
deleted
- if (firstVersionId == Long.MIN_VALUE) {
- // roll wal log writer to delete current wal file
- if (buffer.getCurrentWALFileSize() > 0) {
- rollWALFile();
- }
- // update firstValidVersionId
- firstVersionId = checkpointManager.getFirstValidWALVersionId();
- if (firstVersionId == Long.MIN_VALUE) {
- firstVersionId = buffer.getCurrentWALFileVersion();
- }
+ public boolean isContainsActiveOrPinnedMemTable(Long versionId) {
+ Set<Long> memTableIdsOfCurrentWal = buffer.getMemTableIds(versionId);
+ // If this set is empty, there is a case where WalEntry has been logged
but not persisted,
+ // because WalEntry is persisted asynchronously. In this case, the file
cannot be deleted
+ // directly, so it is considered active
+ if (memTableIdsOfCurrentWal == null ||
memTableIdsOfCurrentWal.isEmpty()) {
+ return true;
}
- return firstVersionId;
+ return !Collections.disjoint(
+ activeOrPinnedMemTables.stream()
+ .map(MemTableInfo::getMemTableId)
+ .collect(Collectors.toSet()),
+ memTableIdsOfCurrentWal);
+ }
+
+ private boolean canDeleteFile(long searchIndex, WALFileStatus
walFileStatus, long versionId) {
+ return (searchIndex < safelyDeletedSearchIndex
+ || walFileStatus == WALFileStatus.CONTAINS_NONE_SEARCH_INDEX)
+ && !isContainsActiveOrPinnedMemTable(versionId);
}
}
@@ -991,4 +988,9 @@ public class WALNode implements IWALNode {
public void setBufferSize(int size) {
buffer.setBufferSize(size);
}
+
+ @TestOnly
+ public WALBuffer getWALBuffer() {
+ return buffer;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 3d9fd3c051a..94fab3181e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -40,11 +41,16 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
@@ -74,6 +80,12 @@ public class WALNodeRecoverTask implements Runnable {
@Override
public void run() {
logger.info("Start recovering WAL node in the directory {}", logDirectory);
+
+ // recover version id and search index
+ long[] indexInfo = recoverLastFile();
+ long lastVersionId = indexInfo[0];
+ long lastSearchIndex = indexInfo[1];
+
try {
recoverInfoFromCheckpoints();
recoverTsFiles();
@@ -110,10 +122,6 @@ public class WALNodeRecoverTask implements Runnable {
logger.error("error when delete checkpoint file. {}",
checkpointFile, e);
}
}
- // recover version id and search index
- long[] indexInfo = recoverLastFile();
- long lastVersionId = indexInfo[0];
- long lastSearchIndex = indexInfo[1];
// register wal node
WALManager.getInstance()
.registerWALNode(
@@ -140,7 +148,7 @@ public class WALNodeRecoverTask implements Runnable {
File lastWALFile = walFiles[walFiles.length - 1];
long lastVersionId = WALFileUtils.parseVersionId(lastWALFile.getName());
long lastSearchIndex =
WALFileUtils.parseStartSearchIndex(lastWALFile.getName());
- WALMetaData metaData = new WALMetaData(lastSearchIndex, new ArrayList<>());
+ WALMetaData metaData = new WALMetaData(lastSearchIndex, new ArrayList<>(),
new HashSet<>());
WALFileStatus fileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
try (WALReader walReader = new WALReader(lastWALFile, true)) {
while (walReader.hasNext()) {
@@ -155,7 +163,7 @@ public class WALNodeRecoverTask implements Runnable {
fileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
}
}
- metaData.add(walEntry.serializedSize(), searchIndex);
+ metaData.add(walEntry.serializedSize(), searchIndex,
walEntry.getMemTableId());
}
} catch (Exception e) {
logger.warn("Fail to read wal logs from {}, skip them", lastWALFile, e);
@@ -235,20 +243,27 @@ public class WALNodeRecoverTask implements Runnable {
// read .wal files and redo logs
for (int i = 0; i < walFiles.length; ++i) {
File walFile = walFiles[i];
- // last wal file may corrupt
- try (WALReader walReader = new WALReader(walFile, i == walFiles.length -
1)) {
- while (walReader.hasNext()) {
- WALEntry walEntry = walReader.next();
- if (!memTableId2Info.containsKey(walEntry.getMemTableId())) {
+ try (WALByteBufReader reader = new WALByteBufReader(walFile)) {
+ if (Collections.disjoint(memTableId2Info.keySet(),
reader.getMetaData().getMemTablesId())) {
+ continue;
+ }
+ while (reader.hasNext()) {
+ ByteBuffer buffer = reader.next();
+ // see WALInfoEntry#serialize, entry type
+ buffer.position(Byte.BYTES);
+ long memTableId = buffer.getLong();
+ if (!memTableId2Info.containsKey(memTableId)) {
continue;
}
-
+ buffer.clear();
+ WALEntry walEntry =
+ WALEntry.deserialize(new DataInputStream(new
ByteArrayInputStream(buffer.array())));
UnsealedTsFileRecoverPerformer recoverPerformer =
memTableId2RecoverPerformer.get(walEntry.getMemTableId());
if (recoverPerformer != null) {
recoverPerformer.redoLog(walEntry);
} else {
- logger.warn(
+ logger.debug(
"Fail to find TsFile recover performer for wal entry in TsFile
{}", walFile);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 46707569e56..ff348415cba 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -1099,6 +1099,8 @@ public class DataRegionTest {
FlushManager flushManager = FlushManager.getInstance();
// flush the sequence memtable
+ tsFileProcessor.getWorkMemTable().getUpdateTime();
+ Thread.sleep(500);
dataRegion.timedFlushSeqMemTable();
// wait until memtable flush task is done
@@ -1154,6 +1156,8 @@ public class DataRegionTest {
FlushManager flushManager = FlushManager.getInstance();
// flush the unsequence memtable
+ tsFileProcessor.getWorkMemTable().getUpdateTime();
+ Thread.sleep(500);
dataRegion.timedFlushUnseqMemTable();
// wait until memtable flush task is done
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
index 08feee58228..58e9aefec32 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALEntryHandlerTest.java
@@ -124,9 +124,12 @@ public class WALEntryHandlerTest {
// pin memTable
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
- walNode1.onMemTableFlushed(memTable);
// roll wal file
walNode1.rollWALFile();
+ InsertRowNode node2 = getInsertRowNode(devicePath,
System.currentTimeMillis());
+ node2.setSearchIndex(2);
+ walNode1.log(memTable.getMemTableId(), node2);
+ walNode1.onMemTableFlushed(memTable);
walNode1.rollWALFile();
// find node1
ConsensusReqReader.ReqIterator itr = walNode1.getReqIterator(1);
@@ -173,13 +176,11 @@ public class WALEntryHandlerTest {
// unpin 1
CheckpointManager checkpointManager = walNode1.getCheckpointManager();
handler.unpinMemTable();
- MemTableInfo oldestMemTableInfo =
checkpointManager.getOldestMemTableInfo();
- assertEquals(memTable.getMemTableId(), oldestMemTableInfo.getMemTableId());
- assertNull(oldestMemTableInfo.getMemTable());
- assertTrue(oldestMemTableInfo.isPinned());
+ MemTableInfo oldestMemTableInfo =
checkpointManager.getOldestUnpinnedMemTableInfo();
+ assertNull(oldestMemTableInfo);
// unpin 2
handler.unpinMemTable();
- assertNull(checkpointManager.getOldestMemTableInfo());
+ assertNull(checkpointManager.getOldestUnpinnedMemTableInfo());
}
@Test
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
new file mode 100644
index 00000000000..c528965b58f
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WalDeleteOutdatedNewTest.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.wal.node;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+public class WalDeleteOutdatedNewTest {
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final String identifier1 = String.valueOf(Integer.MAX_VALUE);
+ private static final String logDirectory1 =
TestConstant.BASE_OUTPUT_PATH.concat("1/2910/");
+ private static final String databasePath = "root.test_sg";
+ private static final String devicePath = databasePath + ".test_d";
+ private static final String dataRegionId = "1";
+ private WALMode prevMode;
+ private boolean prevIsClusterMode;
+ private WALNode walNode1;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.cleanDir(logDirectory1);
+ prevMode = config.getWalMode();
+ prevIsClusterMode = config.isClusterMode();
+ config.setWalMode(WALMode.SYNC);
+ config.setClusterMode(true);
+ walNode1 = new WALNode(identifier1, logDirectory1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ walNode1.close();
+ config.setWalMode(prevMode);
+ config.setClusterMode(prevIsClusterMode);
+ EnvironmentUtils.cleanDir(logDirectory1);
+
+ WALInsertNodeCache.getInstance(1).clear();
+ }
+
+ /**
+ * The simulation here is to write the last file, because serialization and
disk flushing
+ * operations are asynchronous, so you have to wait until all the entries
are processed to get the
+ * correct result, when WalEntry is not consumed to get memTableIdsOfWal,
the result is not
+ * accurate, so when the actual deletion of expired wal files, Don't read
the last wal file.
+ */
+ @Test
+ public void test01() throws IllegalPathException {
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+
+ IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 5));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 6));
+
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+ Map<Long, Set<Long>> memTableIdsOfWal =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(1, memTableIdsOfWal.size());
+ Assert.assertEquals(2, memTableIdsOfWal.get(0L).size());
+ Assert.assertEquals(1, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+ walNode1.close();
+ }
+
+ /**
+ * Ensure that the memtableIds maintained by each wal file are accurate:<br>
+ * 1. _0-1-1.wal:memTable0、memTable1 <br>
+ * 2. roll wal file <br>
+ * 3. _1-6-1.wal: memTable1 <br>
+ * 4. wait until all walEntry consumed
+ */
+ @Test
+ public void test02() throws IllegalPathException {
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+
+ IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 5));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 6));
+
+ walNode1.rollWALFile();
+ walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 7));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 8));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 9));
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+ Map<Long, Set<Long>> memTableIdsOfWal =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(2, memTableIdsOfWal.size());
+ Assert.assertEquals(1, memTableIdsOfWal.get(1L).size());
+ Assert.assertEquals(2, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+ }
+
+ /**
+ * Ensure that files that can be cleaned can be deleted: <br>
+ * 1. _0-0-1.wal: memTable0 、 memTable1 <br>
+ * 2. roll wal file <br>
+ * 3. _1-1-1.wal: memTable1 <br>
+ * 4. wait until all walEntry consumed <br>
+ * 5. memTable0 flush, memTable1 flush <br>
+ * 6. delete outdated wal files
+ */
+ @Test
+ public void test03() throws IllegalPathException {
+
+ IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ walNode1.rollWALFile();
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+
+ Map<Long, Set<Long>> memTableIdsOfWal =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ walNode1.onMemTableFlushed(memTable0);
+ walNode1.onMemTableFlushed(memTable1);
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+ // before deleted
+ Assert.assertEquals(2, memTableIdsOfWal.size());
+ Assert.assertEquals(2, memTableIdsOfWal.get(0L).size());
+ File[] files = WALFileUtils.listAllWALFiles(new File(logDirectory1));
+ Assert.assertEquals(2, files.length);
+
+ walNode1.deleteOutdatedFiles();
+ Map<Long, Set<Long>> memTableIdsOfWalAfter =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+
+ // after deleted
+ Assert.assertEquals(0, memTableIdsOfWalAfter.size());
+ File[] filesAfter = WALFileUtils.listAllWALFiles(new File(logDirectory1));
+ Assert.assertEquals(1, filesAfter.length);
+ }
+
+ /**
+ * Ensure that files that can be cleaned can be deleted: <br>
+ * 1. _0-0-1.wal: memTable0 <br>
+ * 2. roll wal file <br>
+ * 3. _1-1-0.wal: memTable1 <br>
+ * 4. roll wal file <br>
+ * 5. _2-1-1.wal: memTable1 <br>
+ * 6. wait until all walEntry consumed <br>
+ * 7. memTable0 flush, memTable1 flush, memTable2 flush <br>
+ * 6. delete outdated wal files
+ */
+ @Test
+ public void test04() throws IllegalPathException {
+ IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.rollWALFile();
+
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.rollWALFile();
+
+ IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+ walNode1.onMemTableFlushed(memTable2);
+ walNode1.onMemTableFlushed(memTable0);
+ walNode1.onMemTableFlushed(memTable1);
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+ Map<Long, Set<Long>> memTableIdsOfWal =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(3, memTableIdsOfWal.size());
+ Assert.assertEquals(3, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+
+ walNode1.deleteOutdatedFiles();
+ Map<Long, Set<Long>> memTableIdsOfWalAfter =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(0, memTableIdsOfWalAfter.size());
+ Assert.assertEquals(1, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+ }
+
+ /**
+ * Ensure that wal pinned to memtable cannot be deleted: <br>
+ * 1. _0-0-1.wal: memTable0 <br>
+ * 2. pin memTable0 <br>
+ * 3. memTable0 flush <br>
+ * 4. roll wal file <br>
+ * 5. _1-1-1.wal: memTable0、memTable1 <br>
+ * 6. roll wal file <br>
+ * 7. _2-1-1.wal: memTable1 <br>
+ * 8. roll wal file <br>
+ * 9. _2-1-1.wal: memTable1 <br>
+ * 10. wait until all walEntry consumed <br>
+ * 11. memTable0 flush, memTable1 flush <br>
+ * 12. delete outdated wal files
+ */
+ @Test
+ public void test05() throws IllegalPathException, MemTablePinException {
+ IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+ WALFlushListener listener =
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.rollWALFile();
+
+ // pin memTable
+ WALEntryHandler handler = listener.getWalEntryHandler();
+ handler.pinMemTable();
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+ walNode1.rollWALFile();
+
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+ walNode1.rollWALFile();
+
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 5));
+ walNode1.onMemTableFlushed(memTable0);
+ walNode1.onMemTableFlushed(memTable1);
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+ Map<Long, Set<Long>> memTableIdsOfWal =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(4, memTableIdsOfWal.size());
+ Assert.assertEquals(4, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+
+ walNode1.deleteOutdatedFiles();
+ Map<Long, Set<Long>> memTableIdsOfWalAfter =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(3, memTableIdsOfWalAfter.size());
+ Assert.assertEquals(3, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+ }
+
+ /**
+ * Ensure that the flushed wal related to memtable cannot be deleted: <br>
+ * 1. _0-0-1.wal: memTable0 <br>
+ * 2. roll wal file <br>
+ * 3. _1-1-1.wal: memTable0 <br>
+ * 4. roll wal file <br>
+ * 5. _2-1-1.wal: memTable0 <br>
+ * 6. roll wal file <br>
+ * 7. _2-1-1.wal: memTable0 <br>
+ * 8. wait until all walEntry consumed <br>
+ * 9. delete outdated wal files
+ */
+ @Test
+ public void test06() throws IllegalPathException {
+ IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.rollWALFile();
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ walNode1.rollWALFile();
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+ walNode1.rollWALFile();
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 4));
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+ Map<Long, Set<Long>> memTableIdsOfWal =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(4, memTableIdsOfWal.size());
+ Assert.assertEquals(4, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+
+ walNode1.deleteOutdatedFiles();
+ Map<Long, Set<Long>> memTableIdsOfWalAfter =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(4, memTableIdsOfWalAfter.size());
+ Assert.assertEquals(4, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+ }
+
+ /**
+ * Ensure that files that can be cleaned can be deleted: <br>
+ * 1. _0-0-1.wal: memTable0 <br>
+ * 2. roll wal file <br>
+ * 3. _1-1-0.wal: memTable1、memTable2 <br>
+ * 4. roll wal file <br>
+ * 5. _2-1-0.wal: memTable2 <br>
+ * 6. roll wal file <br>
+ * 7. _3-1-0.wal: memTable3 <br>
+ * 8. roll wal file <br>
+ * 9. _4-1-0.wal: memTable3 <br>
+ * 10. wait until all walEntry consumed <br>
+ * 11. memTable1 flush, memTable2 flush, memTable3 flush <br>
+ * 12. delete outdated wal files
+ */
+ @Test
+ public void test07() throws IllegalPathException {
+ IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.rollWALFile();
+
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+
+ IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.rollWALFile();
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.rollWALFile();
+ IMemTable memTable3 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable3, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable3.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.rollWALFile();
+ walNode1.log(
+ memTable3.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.onMemTableFlushed(memTable1);
+ walNode1.onMemTableFlushed(memTable2);
+ walNode1.onMemTableFlushed(memTable3);
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+ Map<Long, Set<Long>> memTableIdsOfWal =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(5, memTableIdsOfWal.size());
+ Assert.assertEquals(5, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+
+ walNode1.deleteOutdatedFiles();
+ Map<Long, Set<Long>> memTableIdsOfWalAfter =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(2, memTableIdsOfWalAfter.size());
+ Assert.assertEquals(2, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+
+ walNode1.onMemTableFlushed(memTable0);
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+ walNode1.deleteOutdatedFiles();
+ Map<Long, Set<Long>> memTableIdsOfWalAfterAfter =
walNode1.getWALBuffer().getMemTableIdsOfWal();
+ Assert.assertEquals(0, memTableIdsOfWalAfterAfter.size());
+ Assert.assertEquals(1, WALFileUtils.listAllWALFiles(new
File(logDirectory1)).length);
+ }
+
+ /**
+ * Ensure that files that can be cleaned can be deleted: <br>
+ * 1. _0-0-1.wal: memTable0 <br>
+ * 2. roll wal file <br>
+ * 3. _1-1-0.wal: memTable1<br>
+ * 4. memTable1 flush <br>
+ * 5. roll wal file <br>
+ * 6. _2-1-0.wal: memTable2 <br>
+ * 7. wait until all walEntry consumed <br>
+ * 8. delete outdated wal files
+ */
+ @Test
+ public void test08() throws IllegalPathException {
+ IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.rollWALFile();
+
+ ConsensusReqReader.ReqIterator itr1 = walNode1.getReqIterator(1);
+ Assert.assertFalse(itr1.hasNext());
+
+ IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable1, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable1.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), -1));
+ walNode1.onMemTableFlushed(memTable1);
+ walNode1.rollWALFile();
+
+ ConsensusReqReader.ReqIterator itr2 = walNode1.getReqIterator(1);
+ Assert.assertTrue(itr2.hasNext());
+
+ IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+ ConsensusReqReader.ReqIterator itr3 = walNode1.getReqIterator(1);
+ Assert.assertTrue(itr3.hasNext());
+ walNode1.deleteOutdatedFiles();
+
+ ConsensusReqReader.ReqIterator itr4 = walNode1.getReqIterator(1);
+ Assert.assertFalse(itr4.hasNext());
+ walNode1.rollWALFile();
+ Assert.assertTrue(itr4.hasNext());
+ }
+
+ /**
+ * Ensure that files that can be cleaned can be deleted: <br>
+ * 1. _0-0-1.wal: memTable0 <br>
+ * 2. roll wal file <br>
+ * 3. _2-1-0.wal: memTable2 <br>
+ * 4. wait until all walEntry consumed <br>
+ * 5. delete outdated wal files
+ */
+ @Test
+ public void test09() throws IllegalPathException {
+ IMemTable memTable0 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable0, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable0.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 1));
+ walNode1.rollWALFile();
+
+ IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
+ walNode1.onMemTableCreated(memTable2, logDirectory1 + "/" + "fake.tsfile");
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 2));
+ walNode1.log(
+ memTable2.getMemTableId(),
+ generateInsertRowNode(devicePath, System.currentTimeMillis(), 3));
+ Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
+
+ ConsensusReqReader.ReqIterator itr3 = walNode1.getReqIterator(1);
+ Assert.assertFalse(itr3.hasNext());
+ }
+
+ public static InsertRowNode generateInsertRowNode(String devicePath, long
time, long searchIndex)
+ throws IllegalPathException {
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = 1.0d;
+ columns[1] = 2f;
+ columns[2] = 10000L;
+ columns[3] = 100;
+ columns[4] = false;
+ columns[5] = new Binary("hh" + 0, TSFileConfig.STRING_CHARSET);
+
+ InsertRowNode node =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ time,
+ columns,
+ false);
+ MeasurementSchema[] schemas = new MeasurementSchema[6];
+ for (int i = 0; i < 6; i++) {
+ schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
+ }
+ node.setMeasurementSchemas(schemas);
+ node.setSearchIndex(searchIndex);
+ return node;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
index 234dec46c26..368cc967ea2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
@@ -48,6 +48,7 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.HashSet;
public class WALRecoverWriterTest {
private final File logFile =
@@ -65,7 +66,7 @@ public class WALRecoverWriterTest {
// prepare file
logFile.createNewFile();
long firstSearchIndex =
WALFileUtils.parseStartSearchIndex(logFile.getName());
- WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>());
+ WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>(), new HashSet<>());
// recover
WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
walRecoverWriter.recover(walMetaData);
@@ -87,7 +88,7 @@ public class WALRecoverWriterTest {
stream.write(1);
}
long firstSearchIndex =
WALFileUtils.parseStartSearchIndex(logFile.getName());
- WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>());
+ WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>(), new HashSet<>());
// recover
WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
walRecoverWriter.recover(walMetaData);
@@ -109,7 +110,7 @@ public class WALRecoverWriterTest {
int size = walEntry.serializedSize();
WALByteBufferForTest buffer = new
WALByteBufferForTest(ByteBuffer.allocate(size));
walEntry.serialize(buffer);
- walMetaData.add(size, 1);
+ walMetaData.add(size, 1, walEntry.getMemTableId());
try (WALWriter walWriter = new WALWriter(logFile)) {
walWriter.write(buffer.getBuffer(), walMetaData);
}
@@ -133,7 +134,7 @@ public class WALRecoverWriterTest {
int size = walEntry.serializedSize();
WALByteBufferForTest buffer = new
WALByteBufferForTest(ByteBuffer.allocate(size));
walEntry.serialize(buffer);
- walMetaData.add(size, 1);
+ walMetaData.add(size, 1, walEntry.getMemTableId());
try (WALWriter walWriter = new WALWriter(logFile)) {
walWriter.write(buffer.getBuffer(), walMetaData);
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 162af1852a0..43d431a7a2d 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -488,31 +488,31 @@ data_replication_factor=1
# Datatype: boolean
# enable_timed_flush_seq_memtable=true
-# If a memTable's created time is older than current time minus this, the
memtable will be flushed to disk.
+# If a memTable's last update time is older than current time minus this, the
memtable will be flushed to disk.
# Only check sequence tsfiles' memtables.
-# The default flush interval is 3 * 60 * 60 * 1000. (unit: ms)
+# The default flush interval is 10 * 60 * 1000. (unit: ms)
# Datatype: long
-# seq_memtable_flush_interval_in_ms=10800000
+# seq_memtable_flush_interval_in_ms=600000
# The interval to check whether sequence memtables need flushing.
-# The default flush check interval is 10 * 60 * 1000. (unit: ms)
+# The default flush check interval is 30 * 1000. (unit: ms)
# Datatype: long
-# seq_memtable_flush_check_interval_in_ms=600000
+# seq_memtable_flush_check_interval_in_ms=30000
# Whether to timed flush unsequence tsfiles' memtables.
# Datatype: boolean
# enable_timed_flush_unseq_memtable=true
-# If a memTable's created time is older than current time minus this, the
memtable will be flushed to disk.
+# If a memTable's last update time is older than current time minus this, the
memtable will be flushed to disk.
# Only check unsequence tsfiles' memtables.
-# The default flush interval is 3 * 60 * 60 * 1000. (unit: ms)
+# The default flush interval is 10 * 60 * 1000. (unit: ms)
# Datatype: long
-# unseq_memtable_flush_interval_in_ms=10800000
+# unseq_memtable_flush_interval_in_ms=600000
# The interval to check whether unsequence memtables need flushing.
-# The default flush check interval is 10 * 60 * 1000. (unit: ms)
+# The default flush check interval is 30 * 1000. (unit: ms)
# Datatype: long
-# unseq_memtable_flush_check_interval_in_ms=600000
+# unseq_memtable_flush_check_interval_in_ms=30000
# The sort algorithms used in the memtable's TVList
# TIM: default tim sort,