This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch research/deferred_strategy in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64c5f9ba847aff2169a8b50626dbd1857635bfc9 Author: LittleHealth <[email protected]> AuthorDate: Sun Oct 15 00:27:38 2023 +0800 [Memtable] implement the deffered strategy --- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 +- .../db/storageengine/dataregion/DataRegion.java | 8 +- .../dataregion/flush/MemTableFlushTask.java | 13 ++- .../dataregion/memtable/AbstractMemTable.java | 64 ++++++++++++- .../memtable/AlignedWritableMemChunk.java | 14 +++ .../memtable/AlignedWritableMemChunkGroup.java | 13 +++ .../dataregion/memtable/IMemTable.java | 9 ++ .../dataregion/memtable/IWritableMemChunk.java | 9 ++ .../memtable/IWritableMemChunkGroup.java | 5 ++ .../dataregion/memtable/PrimitiveMemTable.java | 7 +- .../dataregion/memtable/TsFileProcessor.java | 81 +++++++++++++++-- .../dataregion/memtable/WritableMemChunk.java | 24 ++++- .../dataregion/memtable/WritableMemChunkGroup.java | 20 +++++ .../rescon/memory/PrimitiveArrayManager.java | 1 + .../db/utils/datastructure/AlignedTVList.java | 36 +++++++- .../iotdb/db/utils/datastructure/BackwardSort.java | 4 +- .../iotdb/db/utils/datastructure/IntTVList.java | 100 ++++++++++++++++++--- .../db/utils/datastructure/QuickAlignedTVList.java | 7 +- .../db/utils/datastructure/QuickBinaryTVList.java | 7 +- .../db/utils/datastructure/QuickBooleanTVList.java | 7 +- .../db/utils/datastructure/QuickDoubleTVList.java | 7 +- .../db/utils/datastructure/QuickFloatTVList.java | 7 +- .../db/utils/datastructure/QuickIntTVList.java | 7 +- .../db/utils/datastructure/QuickLongTVList.java | 7 +- .../iotdb/db/utils/datastructure/QuickSort.java | 6 +- .../iotdb/db/utils/datastructure/TVList.java | 16 ++++ .../db/utils/datastructure/TimAlignedTVList.java | 9 +- .../db/utils/datastructure/TimBinaryTVList.java | 9 +- .../db/utils/datastructure/TimBooleanTVList.java | 9 +- .../db/utils/datastructure/TimDoubleTVList.java | 9 +- .../db/utils/datastructure/TimFloatTVList.java | 9 +- .../iotdb/db/utils/datastructure/TimIntTVList.java | 20 +++-- .../db/utils/datastructure/TimLongTVList.java | 9 +- .../iotdb/db/utils/datastructure/TimSort.java | 10 +-- .../TopkDivideMemoryNotEnoughException.java | 8 ++ .../resources/conf/iotdb-common.properties | 5 ++ 36 files changed, 511 insertions(+), 72 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 7850bf6b103..c534cb1f715 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -419,6 +419,11 @@ public class IoTDBDescriptor { properties.getProperty( "tvlist_sort_algorithm", conf.getTvListSortAlgorithm().toString()))); + conf.setSeqMemtableTopKSize( + (Integer.parseInt( + properties.getProperty( + "seq_memtable_topk_size", Integer.toString(conf.getSeqMemtableTopKSize()))))); + conf.setAvgSeriesPointNumberThreshold( Integer.parseInt( properties.getProperty( @@ -1704,7 +1709,7 @@ public class IoTDBDescriptor { logger.info("initial allocateMemoryForWrite = {}", conf.getAllocateMemoryForStorageEngine()); logger.info("initial allocateMemoryForSchema = {}", conf.getAllocateMemoryForSchema()); logger.info("initial allocateMemoryForConsensus = {}", conf.getAllocateMemoryForConsensus()); - + logger.info("initial seqMemtableTopKSize = {}", conf.getSeqMemtableTopKSize()); initSchemaMemoryAllocate(properties); initStorageEngineAllocate(properties); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 75609c20f93..f33f19557f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1267,7 +1267,6 @@ public class DataRegion implements IDataRegionForQuery { version, 0, 0); - return getTsFileProcessor(sequence, filePath, timePartitionId); } @@ -1393,6 +1392,7 @@ public class DataRegion implements IDataRegionForQuery { if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) { timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId()); } + logger.info("close an unsequence tsfile processor {}", databaseName + "-" + dataRegionId); } } @@ -1557,7 +1557,8 @@ public class DataRegion implements IDataRegionForQuery { for (TsFileProcessor tsFileProcessor : tsFileProcessors) { if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) { logger.info( - "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]", + "Exceed sequence memtable {} flush interval, so flush working memtable of time partition {} in database {}[{}]", + tsFileProcessor.getWorkMemTable().getMemTableId(), tsFileProcessor.getTimeRangeId(), databaseName, dataRegionId); @@ -1580,7 +1581,8 @@ public class DataRegion implements IDataRegionForQuery { for (TsFileProcessor tsFileProcessor : tsFileProcessors) { if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) { logger.info( - "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]", + "Exceed unsequence memtable {} flush interval, so flush working memtable of time partition {} in database {}[{}]", + tsFileProcessor.getWorkMemTable().getMemTableId(), tsFileProcessor.getTimeRangeId(), databaseName, dataRegionId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 19450cbe9fc..aa84bface67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -111,7 +111,8 @@ public class MemTableFlushTask { ? 0 : memTable.getTotalPointsNum() / memTable.getSeriesNumber(); LOGGER.info( - "The memTable size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}", + "The memTable {} size of SG {} is {}, the avg series points num in chunk is {}, total timeseries number is {}", + memTable.getMemTableId(), storageGroup, memTable.memSize(), avgSeriesPointsNum, @@ -215,8 +216,9 @@ public class MemTableFlushTask { "flush"); LOGGER.info( - "Database {} memtable {} flushing a memtable has finished! Time consumption: {}ms", + "Database {} flushing memtable {} with {} has finished! Time consumption: {}ms", storageGroup, + memTable.getMemTableId(), memTable, System.currentTimeMillis() - start); } @@ -286,8 +288,13 @@ public class MemTableFlushTask { recordFlushPointsMetric(); LOGGER.info( - "Database {}, flushing memtable {} into disk: Encoding data cost " + "{} ms.", + "Database {}, flushing memtable {} with size {} and points {} in file " + + "{} into disk: Encoding data cost " + + "{} ms.", storageGroup, + memTable.getMemTableId(), + memTable.memSize(), + memTable.getTotalPointsNum(), writer.getFile().getName(), memSerializeTime); WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING, memSerializeTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index bdba167c5bb..3af36c60209 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.utils.MemUtils; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Pair; @@ -85,24 +86,48 @@ public abstract class AbstractMemTable implements IMemTable { private long totalPointsNum = 0; + public long getTotalPointsNumThreshold() { + return totalPointsNumThreshold; + } + private long totalPointsNumThreshold = 0; private long maxPlanIndex = Long.MIN_VALUE; private long minPlanIndex = Long.MAX_VALUE; - private final long memTableId = memTableIdCounter.incrementAndGet(); + private long memTableId; - private final long createdTime = System.currentTimeMillis(); + private long createdTime; private static final String METRIC_POINT_IN = "pointsIn"; protected AbstractMemTable() { this.memTableMap = new HashMap<>(); + createdTime = System.currentTimeMillis(); + memTableId = memTableIdCounter.incrementAndGet(); } - protected AbstractMemTable(Map<IDeviceID, IWritableMemChunkGroup> memTableMap) { + protected AbstractMemTable( + Map<IDeviceID, IWritableMemChunkGroup> memTableMap, AbstractMemTable lastMemTable) { this.memTableMap = memTableMap; + for (IDeviceID key : memTableMap.keySet()) { + Map<String, IWritableMemChunk> memChunkMap = memTableMap.get(key).getMemChunkMap(); + for (String schema : memChunkMap.keySet()) { + IWritableMemChunk memChunk = memChunkMap.get(schema); + totalPointsNum += memChunk.getTVList().rowCount(); + memSize += + (long) memChunk.getTVList().rowCount() + * memChunk.getSchema().getType().getDataTypeSize(); + } + } + totalPointsNumThreshold = lastMemTable.getTotalPointsNumThreshold(); + createdTime = System.currentTimeMillis(); + memTableId = memTableIdCounter.incrementAndGet(); + seriesNumber = lastMemTable.getSeriesNumber(); + tvListRamCost = lastMemTable.getTVListsRamCost() * totalPointsNum / lastMemTable.totalPointsNum; + shouldFlush = false; + flushStatus = FlushStatus.WORKING; } @Override @@ -402,6 +427,16 @@ public abstract class AbstractMemTable implements IMemTable { return memSize; } + @Override + public void setMemSize(long m) { + memSize = memSize + m; + } + + @Override + public void setTotalPointsNum(long m) { + totalPointsNum = totalPointsNum + m; + } + @Override public boolean reachTotalPointNumThreshold() { if (totalPointsNum == 0) { @@ -489,6 +524,20 @@ public abstract class AbstractMemTable implements IMemTable { } } + @Override + public IMemTable divide() throws TopkDivideMemoryNotEnoughException { + Map<IDeviceID, IWritableMemChunkGroup> topkMemTableMap = new HashMap<>(); + for (IDeviceID key : memTableMap.keySet()) { + IWritableMemChunkGroup chunkGroupTemp = memTableMap.get(key).divide(); + topkMemTableMap.put(key, chunkGroupTemp); + } + PrimitiveMemTable topkMemtable = new PrimitiveMemTable(topkMemTableMap, this); + memSize -= topkMemtable.memSize(); + totalPointsNum -= topkMemtable.getTotalPointsNum(); + tvListRamCost -= topkMemtable.getTVListsRamCost(); + return topkMemtable; + } + @Override public void addTVListRamCost(long cost) { this.tvListRamCost += cost; @@ -634,6 +683,15 @@ public abstract class AbstractMemTable implements IMemTable { return latestTimeForEachDevice; } + @Override + public Map<String, Long> getTopKTime() { + Map<String, Long> latestTimeForEachDevice = new HashMap<>(); + for (Entry<IDeviceID, IWritableMemChunkGroup> entry : memTableMap.entrySet()) { + latestTimeForEachDevice.put(entry.getKey().toStringID(), entry.getValue().getTopKTime()); + } + return latestTimeForEachDevice; + } + public static class Factory { private Factory() { // Empty constructor diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index aa4b3c4d6ed..1046ec0e296 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferVie import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -251,11 +252,19 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { return null; } + @Override + public void setSchema(IMeasurementSchema s) {} + @Override public long getMaxTime() { return list.getMaxTime(); } + @Override + public long getTopKTime() { + return list.getTopKTime(); + } + @Override public synchronized TVList getSortedTvListForQuery() { sortTVList(); @@ -464,6 +473,11 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { .getTimestamp(); } + @Override + public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException { + return new AlignedWritableMemChunk(schemaList, (AlignedTVList) list.divide()); + } + @Override public boolean isEmpty() { return list.rowCount() == 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java index 17b12a748da..875c1130692 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternUtil; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -143,6 +144,18 @@ public class AlignedWritableMemChunkGroup implements IWritableMemChunkGroup { return memChunk.getMaxTime(); } + @Override + public long getTopKTime() { + return memChunk.getTopKTime(); + } + + @Override + public IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException { + AlignedWritableMemChunkGroup topkMemChunkGroup = new AlignedWritableMemChunkGroup(); + topkMemChunkGroup.memChunk = (AlignedWritableMemChunk) memChunk.divide(); + return topkMemChunkGroup; + } + public AlignedWritableMemChunk getAlignedMemChunk() { return memChunk; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index b4b0204080d..1f898c05a9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTablet import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -64,6 +65,10 @@ public interface IMemTable extends WALEntryValue { /** @return memory usage */ long memSize(); + void setMemSize(long m); + + void setTotalPointsNum(long m); + /** only used when mem control enabled */ void addTVListRamCost(long cost); @@ -128,6 +133,8 @@ public interface IMemTable extends WALEntryValue { */ void delete(PartialPath path, PartialPath devicePath, long startTimestamp, long endTimestamp); + /** divide the memtable into the topk and regular ones @ return the topk memtable. */ + IMemTable divide() throws TopkDivideMemoryNotEnoughException; /** * Make a copy of this MemTable. * @@ -169,4 +176,6 @@ public interface IMemTable extends WALEntryValue { void setFlushStatus(FlushStatus flushStatus); Map<String, Long> getMaxTime(); + + Map<String, Long> getTopKTime(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 4c07c3ece33..4b643cb35d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.utils.datastructure.TVList; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; @@ -83,6 +84,8 @@ public interface IWritableMemChunk extends WALEntryValue { IMeasurementSchema getSchema(); + void setSchema(IMeasurementSchema s); + /** * served for read requests. * @@ -128,6 +131,10 @@ public interface IWritableMemChunk extends WALEntryValue { return Long.MAX_VALUE; } + default long getTopKTime() { + return Long.MAX_VALUE; + } + /** @return how many points are deleted */ int delete(long lowerBound, long upperBound); @@ -141,5 +148,7 @@ public interface IWritableMemChunk extends WALEntryValue { long getLastPoint(); + IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException; + boolean isEmpty(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java index ecaf8b2b738..eb3c1f0854d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -53,5 +54,9 @@ public interface IWritableMemChunkGroup extends WALEntryValue { long getCurrentTVListSize(String measurement); + IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException; + long getMaxTime(); + + long getTopKTime(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java index d4bf9c18743..70f29dd0920 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java @@ -30,15 +30,16 @@ public class PrimitiveMemTable extends AbstractMemTable { this.disableMemControl = !enableMemControl; } - public PrimitiveMemTable(Map<IDeviceID, IWritableMemChunkGroup> memTableMap) { - super(memTableMap); + public PrimitiveMemTable( + Map<IDeviceID, IWritableMemChunkGroup> memTableMap, AbstractMemTable lastMemtable) { + super(memTableMap, lastMemtable); } @Override public IMemTable copy() { Map<IDeviceID, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap()); - return new PrimitiveMemTable(newMap); + return new PrimitiveMemTable(newMap, this); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 3cd09e73e92..28ca6cdac34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -62,6 +62,7 @@ import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; @@ -91,6 +92,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.GET_QUERY_RESOURCE_FROM_MEM; import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.FLUSHING_MEMTABLE; import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORKING_MEMTABLE; +import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE; @SuppressWarnings("java:S1135") // ignore todos public class TsFileProcessor { @@ -114,6 +116,8 @@ public class TsFileProcessor { /** sync this object in read() and asyncTryToFlush(). */ private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>(); + // private final ConcurrentLinkedDeque<IMemTable> waitingMemTables = new + // ConcurrentLinkedDeque<>(); /** modification to memtable mapping. */ private final List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>(); @@ -121,11 +125,14 @@ public class TsFileProcessor { /** writer for restore tsfile and flushing. */ private RestorableTsFileIOWriter writer; + private RestorableTsFileIOWriter writerTemp; + /** tsfile resource for index this tsfile. */ private final TsFileResource tsFileResource; /** time range index to indicate this processor belongs to which time range */ private long timeRangeId; + /** * Whether the processor is in the queue of the FlushManager or being flushed by a flush thread. */ @@ -175,6 +182,8 @@ public class TsFileProcessor { private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); + private File tsfileTemp; + @SuppressWarnings("squid:S107") public TsFileProcessor( String storageGroupName, @@ -308,6 +317,39 @@ public class TsFileProcessor { private void createNewWorkingMemTable() throws WriteProcessException { workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName); walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath()); + logger.info( + "Succeed to create {} memTable {}", + sequence ? "Sequence" : "Unsequence", + workMemTable.getMemTableId()); + } + + private void createNewWorkingMemTable(IMemTable tobeFlushed) throws WriteProcessException { + if (MEMTABLE_TOPK_SIZE != 0) { + try { + logger.info( + "Try to divide memTable {} with size {} and points {}", + tobeFlushed.getMemTableId(), + tobeFlushed.memSize(), + tobeFlushed.getTotalPointsNum()); + workMemTable = tobeFlushed.divide(); + walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath()); + logger.info( + "Succeed to divide {} memTable {} with size {} and points {} to " + + "new workMemTable {} with size {} and points {} to", + sequence ? "Sequence" : "Unsequence", + tobeFlushed.getMemTableId(), + tobeFlushed.memSize(), + tobeFlushed.getTotalPointsNum(), + workMemTable.getMemTableId(), + workMemTable.memSize(), + workMemTable.getTotalPointsNum()); + } catch (TopkDivideMemoryNotEnoughException e) { + logger.warn(e.getMessage()); + createNewWorkingMemTable(); + } + } else { + createNewWorkingMemTable(); + } } /** @@ -858,15 +900,16 @@ public class TsFileProcessor { if (logger.isInfoEnabled()) { if (workMemTable != null) { logger.info( - "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile " - + "size: {}, plan index: [{}, {}], progress index: {}", + "{}: flush working memtable {} with size: {}, tsfile " + + "size: {}, plan index: [{}, {}], progress index: {} in async close tsfile {}", storageGroupName, - tsFileResource.getTsFile().getAbsolutePath(), + workMemTable.getMemTableId(), workMemTable.memSize(), tsFileResource.getTsFileSize(), workMemTable.getMinPlanIndex(), workMemTable.getMaxPlanIndex(), - tsFileResource.getMaxProgressIndex()); + tsFileResource.getMaxProgressIndex(), + tsFileResource.getTsFile().getAbsolutePath()); } else { logger.info( "{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}", @@ -900,7 +943,12 @@ public class TsFileProcessor { // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke // flushing memTable in System module. addAMemtableIntoFlushingList(tmpMemTable); - logger.info("Memtable {} has been added to flushing list", tmpMemTable); + logger.info( + "Memtable {}: {} with size {} and points {} has been added to flushing list", + tmpMemTable.getMemTableId(), + tmpMemTable, + tmpMemTable.memSize(), + tmpMemTable.getTotalPointsNum()); shouldClose = true; } catch (Exception e) { logger.error( @@ -982,7 +1030,9 @@ public class TsFileProcessor { return; } logger.info( - "Async flush a memtable to tsfile: {}", tsFileResource.getTsFile().getAbsolutePath()); + "Async flush a memtable {} to tsfile: {}", + workMemTable.getMemTableId(), + tsFileResource.getTsFile().getAbsolutePath()); addAMemtableIntoFlushingList(workMemTable); } catch (Exception e) { logger.error( @@ -1007,7 +1057,9 @@ public class TsFileProcessor { private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException { Map<String, Long> lastTimeForEachDevice = new HashMap<>(); if (sequence) { - lastTimeForEachDevice = tobeFlushed.getMaxTime(); + // TODO:修改lastFlushTime的取法 + lastTimeForEachDevice = tobeFlushed.getTopKTime(); + // lastTimeForEachDevice = tobeFlushed.getMaxTime(); // If some devices have been removed in MemTable, the number of device in MemTable and // tsFileResource will not be the same. And the endTime of these devices in resource will be // Long.minValue. @@ -1029,7 +1081,19 @@ public class TsFileProcessor { if (enableMemControl) { SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); } - flushingMemTables.addLast(tobeFlushed); + // TODO:如果是顺序区并且k>0,拆分之后分别进入不同的waitinglist中,否则的话仍然按照原方式刷盘 + if (!sequence || MEMTABLE_TOPK_SIZE == 0) { + flushingMemTables.addLast(tobeFlushed); + workMemTable = null; + } else { + try { + createNewWorkingMemTable(tobeFlushed); + } catch (WriteProcessException e) { + throw new RuntimeException(e); + } + flushingMemTables.addLast(tobeFlushed); + } + if (logger.isDebugEnabled()) { logger.debug( "{}: {} Memtable (signal = {}) is added into the flushing Memtable, queue size = {}", @@ -1042,7 +1106,6 @@ public class TsFileProcessor { if (!(tobeFlushed.isSignalMemTable() || tobeFlushed.isEmpty())) { totalMemTableSize += tobeFlushed.memSize(); } - workMemTable = null; FlushManager.getInstance().registerTsFileProcessor(this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index 21ba12505b7..d954fe35bc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.utils.datastructure.TVList; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; @@ -49,7 +50,13 @@ public class WritableMemChunk implements IWritableMemChunk { this.list = TVList.newList(schema.getType()); } - private WritableMemChunk() {} + public WritableMemChunk(IMeasurementSchema schema, TVList ls) { + this.schema = schema; + this.list = ls; + } + + // private WritableMemChunk() {} + public WritableMemChunk() {} @Override public boolean writeWithFlushCheck(long insertTime, Object objectValue) { @@ -245,11 +252,21 @@ public class WritableMemChunk implements IWritableMemChunk { return schema; } + @Override + public void setSchema(IMeasurementSchema s) { + this.schema = s; + } + @Override public long getMaxTime() { return list.getMaxTime(); } + @Override + public long getTopKTime() { + return list.getTopKTime(); + } + @Override public long getFirstPoint() { if (list.rowCount() == 0) { @@ -268,6 +285,11 @@ public class WritableMemChunk implements IWritableMemChunk { .getTimestamp(); } + @Override + public IWritableMemChunk divide() throws TopkDivideMemoryNotEnoughException { + return new WritableMemChunk(this.schema, list.divide()); + } + @Override public boolean isEmpty() { return list.rowCount() == 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java index 707eace7fa5..dbe03f14c02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternUtil; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; +import org.apache.iotdb.db.utils.datastructure.TopkDivideMemoryNotEnoughException; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -172,6 +173,25 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup { return maxTime; } + @Override + public long getTopKTime() { + long maxTime = Long.MIN_VALUE; + for (IWritableMemChunk memChunk : memChunkMap.values()) { + maxTime = Math.max(maxTime, memChunk.getTopKTime()); + } + return maxTime; + } + + @Override + public IWritableMemChunkGroup divide() throws TopkDivideMemoryNotEnoughException { + WritableMemChunkGroup topkMemChunkGroup = new WritableMemChunkGroup(); + for (String key : memChunkMap.keySet()) { + IWritableMemChunk topkMemChunk = memChunkMap.get(key).divide(); + topkMemChunkGroup.memChunkMap.put(key, topkMemChunk); + } + return topkMemChunkGroup; + } + @Override public int serializedSize() { int size = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java index 33264fc46c1..4f1edd83ca1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java @@ -40,6 +40,7 @@ public class PrimitiveArrayManager { private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); public static final int ARRAY_SIZE = CONFIG.getPrimitiveArraySize(); + public static final int MEMTABLE_TOPK_SIZE = CONFIG.getSeqMemtableTopKSize(); public static final TVListSortAlgorithm TVLIST_SORT_ALGORITHM = CONFIG.getTvListSortAlgorithm(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 95bbd231ab9..b84aec8ddd0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -43,8 +43,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; -import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; +import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*; import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF; @@ -643,6 +642,37 @@ public abstract class AlignedTVList extends TVList { } } + @Override + public TVList divide() throws TopkDivideMemoryNotEnoughException { + if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) { + throw new TopkDivideMemoryNotEnoughException( + String.format( + "WARNING: MEMTABLE_TOPK_SIZE is bigger than%d " + + "the TVList's row count %d" + + "or is smaller than ARRAY_SIZE %d", + rowCount, MEMTABLE_TOPK_SIZE, ARRAY_SIZE)); + } + AlignedTVList topkTVList = AlignedTVList.newAlignedList(dataTypes); + int truncatedIndex = (rowCount - MEMTABLE_TOPK_SIZE + ARRAY_SIZE - 1) / ARRAY_SIZE; + + for (int i = truncatedIndex + 1; i <= rowCount / ARRAY_SIZE; i++) { + topkTVList.timestamps.add(timestamps.get(i)); + for (int colIndex = 0; colIndex < values.size(); colIndex++) { + topkTVList.values.get(colIndex).add(values.get(colIndex).get(i)); + topkTVList.bitMaps.get(colIndex).add(bitMaps.get(colIndex).get(i)); + } + } + for (int i = rowCount / ARRAY_SIZE; i >= truncatedIndex + 1; i--) { + timestamps.remove(timestamps.size() - 1); + for (int colIndex = 0; colIndex < values.size(); colIndex++) { + topkTVList.values.get(colIndex).remove(values.get(colIndex).size() - 1); + topkTVList.bitMaps.get(colIndex).remove(bitMaps.get(colIndex).size() - 1); + } + } + rowCount = truncatedIndex * ARRAY_SIZE; + topKTime = timestamps.get(truncatedIndex - 1)[ARRAY_SIZE - 1]; + return topkTVList; + } /** * Get the row index value in index column. * @@ -745,6 +775,8 @@ public abstract class AlignedTVList extends TVList { checkExpansion(); } } + sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), end); + topKTime = Math.max(topKTime, getTime(Math.max(0, end - MEMTABLE_TOPK_SIZE))); } private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex, int remaining) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java index 6271a31734e..21783bc0370 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BackwardSort.java @@ -53,7 +53,7 @@ public interface BackwardSort extends QuickSort { /** * check block-inversions to find the proper block_size, which is a multiple of array_size. For - * totally ordered, the block_size will equals to array_size For totally reverse ordered, the + * totally ordered, the block_size will equal to array_size For totally reverse ordered, the * block_size will equals to the rowCount. INVERSION_RATIOS_THRESHOLD=0.005 is a empiric value. * * @param timestamps @@ -132,6 +132,6 @@ public interface BackwardSort extends QuickSort { * @param hi */ default void sortBlock(int lo, int hi) { - qsort(lo, hi); + qSort(lo, hi); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index c64dbc94fd8..1865efe6a9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -34,8 +34,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; -import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; +import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.*; public abstract class IntTVList extends TVList { // list of primitive array, add 1 when expanded -> int primitive array @@ -77,14 +76,59 @@ public abstract class IntTVList extends TVList { @Override public void putInt(long timestamp, int value) { checkExpansion(); - int arrayIndex = rowCount / ARRAY_SIZE; - int elementIndex = rowCount % ARRAY_SIZE; - maxTime = Math.max(maxTime, timestamp); - timestamps.get(arrayIndex)[elementIndex] = timestamp; - values.get(arrayIndex)[elementIndex] = value; - rowCount++; - if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) { - sorted = false; + if (MEMTABLE_TOPK_SIZE == 0) { + int arrayIndex = rowCount / ARRAY_SIZE; + int elementIndex = rowCount % ARRAY_SIZE; + maxTime = Math.max(maxTime, timestamp); + topKTime = Math.max(topKTime, timestamp); + timestamps.get(arrayIndex)[elementIndex] = timestamp; + values.get(arrayIndex)[elementIndex] = value; + rowCount++; + if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) { + sorted = false; + } + } else { + int arrayIndex; + int elementIndex; + int waitlen = Math.min(rowCount, MEMTABLE_TOPK_SIZE); + int tempRowCount = rowCount - 1; // 记录向前比较的截止位置 + while (waitlen > 0) { + waitlen--; + arrayIndex = tempRowCount / ARRAY_SIZE; + elementIndex = tempRowCount % ARRAY_SIZE; + if (timestamps.get(arrayIndex)[elementIndex] > timestamp) { + tempRowCount--; + continue; + } + break; + } + arrayIndex = rowCount / ARRAY_SIZE; + elementIndex = rowCount % ARRAY_SIZE; + int arrayIndexLeft = (rowCount - 1) / ARRAY_SIZE; + int elementIndexLeft = (rowCount - 1) % ARRAY_SIZE; + for (int i = rowCount - 1; i > tempRowCount; i--) { + timestamps.get(arrayIndex)[elementIndex] = timestamps.get(arrayIndexLeft)[elementIndexLeft]; + values.get(arrayIndex)[elementIndex] = values.get(arrayIndexLeft)[elementIndexLeft]; + arrayIndex = arrayIndexLeft; + elementIndex = elementIndexLeft; + arrayIndexLeft = (i - 1) / ARRAY_SIZE; + elementIndexLeft = (i - 1) % ARRAY_SIZE; + sortCount++; + } + arrayIndex = (tempRowCount + 1) / ARRAY_SIZE; + elementIndex = (tempRowCount + 1) % ARRAY_SIZE; + maxTime = Math.max(maxTime, timestamp); + timestamps.get(arrayIndex)[elementIndex] = timestamp; + values.get(arrayIndex)[elementIndex] = value; + if (rowCount > MEMTABLE_TOPK_SIZE) { + arrayIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) / ARRAY_SIZE; + elementIndex = (rowCount - MEMTABLE_TOPK_SIZE - 1) % ARRAY_SIZE; + topKTime = Math.max(topKTime, timestamps.get(arrayIndex)[elementIndex]); + } + rowCount++; + if (sorted && rowCount > 1 && waitlen == 0) { + sorted = false; + } } } @@ -123,6 +167,37 @@ public abstract class IntTVList extends TVList { values.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); } + @Override + public TVList divide() throws TopkDivideMemoryNotEnoughException { + if (MEMTABLE_TOPK_SIZE > rowCount || MEMTABLE_TOPK_SIZE < ARRAY_SIZE) { + throw new TopkDivideMemoryNotEnoughException( + String.format( + "WARNING: MEMTABLE_TOPK_SIZE %d is bigger than " + + "the TVList's row count %d, " + + "or is smaller than ARRAY_SIZE %d", + MEMTABLE_TOPK_SIZE, rowCount, ARRAY_SIZE)); + } + IntTVList topkTVList = IntTVList.newList(); + int truncatedIndex = rowCount - MEMTABLE_TOPK_SIZE; + int truncatedArrayIndex = + truncatedIndex / ARRAY_SIZE; // no matter truncatedIndex in or not in the block + truncatedIndex = truncatedArrayIndex * ARRAY_SIZE; + for (int i = truncatedArrayIndex; i < timestamps.size(); i++) { + topkTVList.timestamps.add(timestamps.get(i)); + topkTVList.values.add(values.get(i)); + } + for (int i = timestamps.size() - 1; i >= truncatedArrayIndex; i--) { + timestamps.remove(timestamps.size() - 1); + values.remove(values.size() - 1); + } + topkTVList.rowCount = rowCount - truncatedIndex; + topkTVList.sorted = true; + topkTVList.topKTime = topkTVList.getTime(0); + rowCount = truncatedIndex; + topKTime = topkTVList.getTopKTime(); + return topkTVList; + } + @Override public TimeValuePair getTimeValuePair(int index) { return new TimeValuePair( @@ -201,6 +276,11 @@ public abstract class IntTVList extends TVList { checkExpansion(); } } + if (MEMTABLE_TOPK_SIZE != 0) { + sort(Math.max(0, start - MEMTABLE_TOPK_SIZE), rowCount); + int index = Math.max(0, rowCount - MEMTABLE_TOPK_SIZE - 1); + topKTime = Math.max(topKTime, getTime(index)); + } } // move null values to the end of time array and value array, then return number of null values diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java index 5e6a9dec2dc..d60067af576 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickAlignedTVList.java @@ -30,11 +30,16 @@ public class QuickAlignedTVList extends AlignedTVList implements QuickSort { @Override public void sort() { if (!sorted) { - qsort(0, rowCount - 1); + qSort(0, rowCount - 1); } sorted = true; } + @Override + public void sort(int lo, int hi) { + qSort(lo, hi); + } + @Override protected void set(int src, int dest) { long srcT = getTime(src); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java index dcdcbc2cb59..ee4e6f9cbc8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java @@ -42,11 +42,16 @@ public class QuickBinaryTVList extends BinaryTVList implements QuickSort { @Override public void sort() { if (!sorted) { - qsort(0, rowCount - 1); + qSort(0, rowCount - 1); } sorted = true; } + @Override + public void sort(int lo, int hi) { + qSort(lo, hi); + } + @Override protected void set(int src, int dest) { long srcT = getTime(src); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java index e4fbfaefca2..c7b43470551 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBooleanTVList.java @@ -39,11 +39,16 @@ public class QuickBooleanTVList extends BooleanTVList implements QuickSort { @Override public void sort() { if (!sorted) { - qsort(0, rowCount - 1); + qSort(0, rowCount - 1); } sorted = true; } + @Override + public void sort(int lo, int hi) { + qSort(lo, hi); + } + @Override protected void set(int src, int dest) { long srcT = getTime(src); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java index 852844f9231..584c8f8f789 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickDoubleTVList.java @@ -39,11 +39,16 @@ public class QuickDoubleTVList extends DoubleTVList implements QuickSort { @Override public void sort() { if (!sorted) { - qsort(0, rowCount - 1); + qSort(0, rowCount - 1); } sorted = true; } + @Override + public void sort(int lo, int hi) { + qSort(lo, hi); + } + @Override protected void set(int src, int dest) { long srcT = getTime(src); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java index 409a00a093e..90485257276 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickFloatTVList.java @@ -39,11 +39,16 @@ public class QuickFloatTVList extends FloatTVList implements QuickSort { @Override public void sort() { if (!sorted) { - qsort(0, rowCount - 1); + qSort(0, rowCount - 1); } sorted = true; } + @Override + public void sort(int lo, int hi) { + qSort(lo, hi); + } + @Override protected void set(int src, int dest) { long srcT = getTime(src); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java index bc44cf79053..d24842babac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickIntTVList.java @@ -22,11 +22,16 @@ public class QuickIntTVList extends IntTVList implements QuickSort { @Override public void sort() { if (!sorted) { - qsort(0, rowCount - 1); + qSort(0, rowCount - 1); } sorted = true; } + @Override + public void sort(int lo, int hi) { + qSort(lo, hi); + } + @Override public void swap(int p, int q) { int valueP = getInt(p); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java index 16d8fab3606..3d4f1ab1a7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickLongTVList.java @@ -39,11 +39,16 @@ public class QuickLongTVList extends LongTVList implements QuickSort { @Override public void sort() { if (!sorted) { - qsort(0, rowCount - 1); + qSort(0, rowCount - 1); } sorted = true; } + @Override + public void sort(int lo, int hi) { + qSort(lo, hi); + } + @Override protected void set(int src, int dest) { long srcT = getTime(src); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java index 8229781aa05..9bad94d4f9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickSort.java @@ -58,7 +58,7 @@ public interface QuickSort { // } // } - default void qsort(int lo, int hi) { + default void qSort(int lo, int hi) { if (lo < hi) { // TODO: use insertion sort in smaller array // if(hi - lo <= 32) { @@ -66,8 +66,8 @@ public interface QuickSort { // } // partition int pivotIndex = partition(lo, hi); - qsort(lo, pivotIndex - 1); - qsort(pivotIndex + 1, hi); + qSort(lo, pivotIndex - 1); + qSort(pivotIndex + 1, hi); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 246102bb7eb..788af81cf89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; +import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.MEMTABLE_TOPK_SIZE; import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF; @@ -58,6 +59,8 @@ public abstract class TVList implements WALEntryValue { protected boolean sorted = true; protected long maxTime; + protected long topKTime; + protected int sortCount; // record reference count of this tv list // currently this reference will only be increase because we can't know when to decrease it protected AtomicInteger referenceCount; @@ -66,7 +69,9 @@ public abstract class TVList implements WALEntryValue { protected TVList() { timestamps = new ArrayList<>(); rowCount = 0; + sortCount = 0; maxTime = Long.MIN_VALUE; + topKTime = Long.MIN_VALUE; referenceCount = new AtomicInteger(); } @@ -109,6 +114,8 @@ public abstract class TVList implements WALEntryValue { public abstract void sort(); + public abstract void sort(int lo, int hi); + public void increaseReferenceCount() { referenceCount.incrementAndGet(); } @@ -214,6 +221,10 @@ public abstract class TVList implements WALEntryValue { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } + public TVList divide() throws TopkDivideMemoryNotEnoughException { + throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); + } + public Object getAlignedValue(int index) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } @@ -231,6 +242,10 @@ public abstract class TVList implements WALEntryValue { return maxTime; } + public long getTopKTime() { + return topKTime; + } + public long getVersion() { return version; } @@ -330,6 +345,7 @@ public abstract class TVList implements WALEntryValue { for (int i = start; i < end; i++) { inPutMinTime = Math.min(inPutMinTime, time[i]); maxTime = Math.max(maxTime, time[i]); + topKTime = Math.max(topKTime, time[Math.max(0, i - MEMTABLE_TOPK_SIZE)]); if (inputSorted && i < length - 1 && time[i] > time[i + 1]) { inputSorted = false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java index 7c6fa7f92ef..819b00e3c93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimAlignedTVList.java @@ -50,7 +50,7 @@ public class TimAlignedTVList extends AlignedTVList implements TimSort { (int[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount); } if (!sorted) { - sort(0, rowCount); + this.timSort(0, rowCount); } clearSortedValue(); clearSortedTime(); @@ -58,7 +58,12 @@ public class TimAlignedTVList extends AlignedTVList implements TimSort { } @Override - public void tim_set(int src, int dest) { + public void sort(int lo, int hi) { + timSort(lo, hi); + } + + @Override + public void timSet(int src, int dest) { set(src, dest); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java index 03760fd14c8..fccb64a1b2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBinaryTVList.java @@ -44,14 +44,19 @@ public class TimBinaryTVList extends BinaryTVList implements TimSort { sortedValues = (Binary[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.TEXT, rowCount); } - sort(0, rowCount); + timSort(0, rowCount); clearSortedValue(); clearSortedTime(); sorted = true; } @Override - public void tim_set(int src, int dest) { + public void sort(int lo, int hi) { + timSort(lo, hi); + } + + @Override + public void timSet(int src, int dest) { set(src, dest); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java index 22b56475905..04d43f30bb9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimBooleanTVList.java @@ -43,7 +43,7 @@ public class TimBooleanTVList extends BooleanTVList implements TimSort { (boolean[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.BOOLEAN, rowCount); } if (!sorted) { - sort(0, rowCount); + timSort(0, rowCount); } clearSortedValue(); clearSortedTime(); @@ -51,7 +51,12 @@ public class TimBooleanTVList extends BooleanTVList implements TimSort { } @Override - public void tim_set(int src, int dest) { + public void sort(int lo, int hi) { + timSort(lo, hi); + } + + @Override + public void timSet(int src, int dest) { set(src, dest); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java index 3c0aa823798..543a4e2ec29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimDoubleTVList.java @@ -43,7 +43,7 @@ public class TimDoubleTVList extends DoubleTVList implements TimSort { (double[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.DOUBLE, rowCount); } if (!sorted) { - sort(0, rowCount); + timSort(0, rowCount); } clearSortedValue(); clearSortedTime(); @@ -51,7 +51,12 @@ public class TimDoubleTVList extends DoubleTVList implements TimSort { } @Override - public void tim_set(int src, int dest) { + public void sort(int lo, int hi) { + timSort(lo, hi); + } + + @Override + public void timSet(int src, int dest) { set(src, dest); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java index 2b1fb440a95..d360959a81e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimFloatTVList.java @@ -44,7 +44,7 @@ public class TimFloatTVList extends FloatTVList implements TimSort { (float[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.FLOAT, rowCount); } if (!sorted) { - sort(0, rowCount); + timSort(0, rowCount); } clearSortedValue(); clearSortedTime(); @@ -52,7 +52,12 @@ public class TimFloatTVList extends FloatTVList implements TimSort { } @Override - public void tim_set(int src, int dest) { + public void sort(int lo, int hi) { + timSort(lo, hi); + } + + @Override + public void timSet(int src, int dest) { set(src, dest); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java index b3ae939892d..d2f153e05a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimIntTVList.java @@ -33,18 +33,22 @@ public class TimIntTVList extends IntTVList implements TimSort { @Override public void sort() { + sort(0, rowCount); + } + + @Override + public void sort(int lo, int hi) { + int len = hi - lo; if (sortedTimestamps == null - || sortedTimestamps.length < PrimitiveArrayManager.getArrayRowCount(rowCount)) { + || sortedTimestamps.length < PrimitiveArrayManager.getArrayRowCount(len)) { sortedTimestamps = - (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount); + (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, len); } - if (sortedValues == null - || sortedValues.length < PrimitiveArrayManager.getArrayRowCount(rowCount)) { - sortedValues = - (int[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, rowCount); + if (sortedValues == null || sortedValues.length < PrimitiveArrayManager.getArrayRowCount(len)) { + sortedValues = (int[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT32, len); } if (!sorted) { - sort(0, rowCount); + timSort(0, len); } clearSortedValue(); clearSortedTime(); @@ -52,7 +56,7 @@ public class TimIntTVList extends IntTVList implements TimSort { } @Override - public void tim_set(int src, int dest) { + public void timSet(int src, int dest) { set(src, dest); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java index e3a02e41db5..9096cd47e1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimLongTVList.java @@ -43,7 +43,7 @@ public class TimLongTVList extends LongTVList implements TimSort { (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, rowCount); } if (!sorted) { - sort(0, rowCount); + timSort(0, rowCount); } clearSortedValue(); clearSortedTime(); @@ -51,7 +51,12 @@ public class TimLongTVList extends LongTVList implements TimSort { } @Override - public void tim_set(int src, int dest) { + public void sort(int lo, int hi) { + timSort(lo, hi); + } + + @Override + public void timSet(int src, int dest) { set(src, dest); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java index 63584429d4a..0426fdafe66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TimSort.java @@ -27,7 +27,7 @@ public interface TimSort { int SMALL_ARRAY_LENGTH = 32; /** the same as the 'set' function in TVList, the reason is to avoid two equal functions. */ - void tim_set(int src, int dest); + void timSet(int src, int dest); void setFromSorted(int src, int dest); @@ -55,7 +55,7 @@ public interface TimSort { * the entrance of tim_sort; 1. array_size <= 32, use binary sort. 2. recursively invoke merge * sort. */ - default void sort(int lo, int hi) { + default void timSort(int lo, int hi) { if (lo == hi) { return; } @@ -65,8 +65,8 @@ public interface TimSort { return; } int mid = (lo + hi) >>> 1; - sort(lo, mid); - sort(mid, hi); + timSort(lo, mid); + timSort(mid, hi); merge(lo, mid, hi); } @@ -127,7 +127,7 @@ public interface TimSort { */ int n = start - left; // The number of elements to move for (int i = n; i >= 1; i--) { - tim_set(left + i - 1, left + i); + timSet(left + i - 1, left + i); } setPivotTo(left); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java new file mode 100644 index 00000000000..e1d8d727ec6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TopkDivideMemoryNotEnoughException.java @@ -0,0 +1,8 @@ +package org.apache.iotdb.db.utils.datastructure; + +public class TopkDivideMemoryNotEnoughException extends Exception { + + public TopkDivideMemoryNotEnoughException(String message) { + super(message); + } +} diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 277e7c3d17c..6e971ec724c 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -516,6 +516,11 @@ cluster_name=defaultCluster # BACKWARD: backward sort # tvlist_sort_algorithm=TIM +# The number of points stay in the next tvlist +# The default waiting size is 0 +# Datatype: int +# seq_memtable_topk_size=0 + # When the average point number of timeseries in memtable exceeds this, the memtable is flushed to disk. The default threshold is 100000. # Datatype: int # avg_series_point_number_threshold=100000
