This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.3.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b6aa1a872ad4ddad433a0ddfa1fc53ad6beb44b6 Author: Haonan <[email protected]> AuthorDate: Thu Jul 11 10:23:25 2024 +0800 Limit the row count of a single TVList to avg_series_point_number_threshold (#12898) * init * optimize mem control code and flush metric --- .../iotdb/db/storageengine/StorageEngine.java | 6 +- .../db/storageengine/dataregion/DataRegion.java | 8 +-- .../dataregion/memtable/AbstractMemTable.java | 16 +++-- .../memtable/AlignedWritableMemChunk.java | 25 ++++---- .../dataregion/memtable/IMemTable.java | 7 +-- .../dataregion/memtable/IWritableMemChunk.java | 20 +++--- .../dataregion/memtable/TsFileProcessor.java | 28 ++++++--- .../dataregion/memtable/WritableMemChunk.java | 71 +++++++++++----------- .../db/utils/datastructure/AlignedTVList.java | 4 +- .../iotdb/db/utils/datastructure/BinaryTVList.java | 4 +- .../iotdb/db/utils/datastructure/TVList.java | 6 +- 11 files changed, 98 insertions(+), 97 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index f7737201f19..71145964ebb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -161,13 +161,9 @@ public class StorageEngine implements IService { } /** block insertion if the insertion is rejected by memory control */ - public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor) - throws WriteProcessRejectException { + public static void blockInsertionIfReject() throws WriteProcessRejectException { long startTime = System.currentTimeMillis(); while (SystemInfo.getInstance().isRejected()) { - if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) { - break; - } try { TimeUnit.MILLISECONDS.sleep(CONFIG.getCheckPeriodWhenInsertBlocked()); if (System.currentTimeMillis() - startTime > CONFIG.getMaxWaitingTimeWhenInsertBlocked()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 55e94d07375..d3a4ea1fcfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -861,7 +861,7 @@ public class DataRegion implements IDataRegionForQuery { throw new OutOfTTLException( insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - dataTTL)); } - StorageEngine.blockInsertionIfReject(null); + StorageEngine.blockInsertionIfReject(); long startTime = System.nanoTime(); writeLock("InsertRow"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); @@ -910,7 +910,7 @@ public class DataRegion implements IDataRegionForQuery { @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning public void insertTablet(InsertTabletNode insertTabletNode) throws BatchProcessException, WriteProcessException { - StorageEngine.blockInsertionIfReject(null); + StorageEngine.blockInsertionIfReject(); long startTime = System.nanoTime(); writeLock("insertTablet"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); @@ -3094,7 +3094,7 @@ public class DataRegion implements IDataRegionForQuery { */ public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) throws WriteProcessException, BatchProcessException { - StorageEngine.blockInsertionIfReject(null); + StorageEngine.blockInsertionIfReject(); long startTime = System.nanoTime(); writeLock("InsertRowsOfOneDevice"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); @@ -3204,7 +3204,7 @@ public class DataRegion implements IDataRegionForQuery { public void insert(InsertRowsNode insertRowsNode) throws BatchProcessException, WriteProcessRejectException { - StorageEngine.blockInsertionIfReject(null); + StorageEngine.blockInsertionIfReject(); long startTime = System.nanoTime(); writeLock("InsertRows"); PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 07f65e229f8..e898736485e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -69,6 +69,7 @@ public abstract class AbstractMemTable implements IMemTable { private static final DeviceIDFactory deviceIDFactory = DeviceIDFactory.getInstance(); private boolean shouldFlush = false; + private boolean reachChunkSizeOrPointNumThreshold = false; private volatile FlushStatus flushStatus = FlushStatus.WORKING; private final int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold(); @@ -318,7 +319,7 @@ public abstract class AbstractMemTable implements IMemTable { IWritableMemChunkGroup memChunkGroup = createMemChunkGroupIfNotExistAndGet(deviceId, schemaList); if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) { - shouldFlush = true; + reachChunkSizeOrPointNumThreshold = true; } } @@ -331,7 +332,7 @@ public abstract class AbstractMemTable implements IMemTable { IWritableMemChunkGroup memChunkGroup = createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList); if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) { - shouldFlush = true; + reachChunkSizeOrPointNumThreshold = true; } } @@ -353,7 +354,7 @@ public abstract class AbstractMemTable implements IMemTable { schemaList, start, end)) { - shouldFlush = true; + reachChunkSizeOrPointNumThreshold = true; } } @@ -379,7 +380,7 @@ public abstract class AbstractMemTable implements IMemTable { schemaList, start, end)) { - shouldFlush = true; + reachChunkSizeOrPointNumThreshold = true; } } @@ -426,11 +427,8 @@ public abstract class AbstractMemTable implements IMemTable { } @Override - public boolean reachTotalPointNumThreshold() { - if (totalPointsNum == 0) { - return false; - } - return totalPointsNum >= totalPointsNumThreshold; + public boolean reachChunkSizeOrPointNumThreshold() { + return reachChunkSizeOrPointNumThreshold; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 489520b201c..934b616e393 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -87,22 +87,22 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } @Override - public void putLong(long t, long v) { + public boolean putLongWithFlushCheck(long t, long v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public void putInt(long t, int v) { + public boolean putIntWithFlushCheck(long t, int v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public void putFloat(long t, float v) { + public boolean putFloatWithFlushCheck(long t, float v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public void putDouble(long t, double v) { + public boolean putDoubleWithFlushCheck(long t, double v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @@ -112,33 +112,33 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } @Override - public void putBoolean(long t, boolean v) { + public boolean putBooleanWithFlushCheck(long t, boolean v) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override public boolean putAlignedValueWithFlushCheck(long t, Object[] v) { list.putAlignedValue(t, v); - return list.reachMaxChunkSizeThreshold(); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) { + public boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) { + public boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) { + public boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @Override - public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) { + public boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @@ -149,7 +149,8 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { } @Override - public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) { + public boolean putBooleansWithFlushCheck( + long[] t, boolean[] v, BitMap bitMap, int start, int end) { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR); } @@ -157,7 +158,7 @@ public class AlignedWritableMemChunk implements IWritableMemChunk { public boolean putAlignedValuesWithFlushCheck( long[] t, Object[] v, BitMap[] bitMaps, int start, int end) { list.putAlignedValues(t, v, bitMaps, start, end); - return list.reachMaxChunkSizeThreshold(); + return list.reachChunkSizeOrPointNumThreshold(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index b2288f174f8..222b4199105 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -75,12 +75,7 @@ public interface IMemTable extends WALEntryValue { /** only used when mem control enabled */ long getTVListsRamCost(); - /** - * only used when mem control enabled - * - * @return whether the average number of points in each WritableChunk reaches the threshold - */ - boolean reachTotalPointNumThreshold(); + boolean reachChunkSizeOrPointNumThreshold(); int getSeriesNumber(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 4c07c3ece33..fea1770fdee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -30,31 +30,31 @@ import java.util.List; public interface IWritableMemChunk extends WALEntryValue { - void putLong(long t, long v); + boolean putLongWithFlushCheck(long t, long v); - void putInt(long t, int v); + boolean putIntWithFlushCheck(long t, int v); - void putFloat(long t, float v); + boolean putFloatWithFlushCheck(long t, float v); - void putDouble(long t, double v); + boolean putDoubleWithFlushCheck(long t, double v); boolean putBinaryWithFlushCheck(long t, Binary v); - void putBoolean(long t, boolean v); + boolean putBooleanWithFlushCheck(long t, boolean v); boolean putAlignedValueWithFlushCheck(long t, Object[] v); - void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end); + boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int start, int end); - void putInts(long[] t, int[] v, BitMap bitMap, int start, int end); + boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int start, int end); - void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end); + boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, int start, int end); - void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end); + boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, int start, int end); boolean putBinariesWithFlushCheck(long[] t, Binary[] v, BitMap bitMap, int start, int end); - void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end); + boolean putBooleansWithFlushCheck(long[] t, boolean[] v, BitMap bitMap, int start, int end); boolean putAlignedValuesWithFlushCheck( long[] t, Object[] v, BitMap[] bitMaps, int start, int end); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 680f30389b5..6233b426b3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -43,7 +43,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; import org.apache.iotdb.db.service.metrics.WritingMetrics; -import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo; import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener; @@ -91,6 +90,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -909,7 +909,22 @@ public class TsFileProcessor { if (dataRegionInfo.needToReportToSystem()) { try { if (!SystemInfo.getInstance().reportStorageGroupStatus(dataRegionInfo, this)) { - StorageEngine.blockInsertionIfReject(this); + long startTime = System.currentTimeMillis(); + while (SystemInfo.getInstance().isRejected()) { + if (workMemTable.shouldFlush()) { + break; + } + try { + TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked()); + if (System.currentTimeMillis() - startTime + > config.getMaxWaitingTimeWhenInsertBlocked()) { + throw new WriteProcessRejectException( + "System rejected over " + (System.currentTimeMillis() - startTime) + "ms"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } } catch (WriteProcessRejectException e) { dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement); @@ -987,13 +1002,8 @@ public class TsFileProcessor { if (workMemTable.shouldFlush()) { return true; } - if (workMemTable.reachTotalPointNumThreshold()) { - logger.info( - "The avg series points num {} of tsfile {} reaches the threshold", - workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(), - tsFileResource.getTsFile().getAbsolutePath()); - WritingMetrics.getInstance() - .recordSeriesFullFlushMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); + if (workMemTable.reachChunkSizeOrPointNumThreshold()) { + WritingMetrics.getInstance().recordSeriesFullFlushMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); return true; } return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index c1a71d32a53..134bf4310e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -61,26 +61,20 @@ public class WritableMemChunk implements IWritableMemChunk { public boolean writeWithFlushCheck(long insertTime, Object objectValue) { switch (schema.getType()) { case BOOLEAN: - putBoolean(insertTime, (boolean) objectValue); - break; + return putBooleanWithFlushCheck(insertTime, (boolean) objectValue); case INT32: - putInt(insertTime, (int) objectValue); - break; + return putIntWithFlushCheck(insertTime, (int) objectValue); case INT64: - putLong(insertTime, (long) objectValue); - break; + return putLongWithFlushCheck(insertTime, (long) objectValue); case FLOAT: - putFloat(insertTime, (float) objectValue); - break; + return putFloatWithFlushCheck(insertTime, (float) objectValue); case DOUBLE: - putDouble(insertTime, (double) objectValue); - break; + return putDoubleWithFlushCheck(insertTime, (double) objectValue); case TEXT: return putBinaryWithFlushCheck(insertTime, (Binary) objectValue); default: - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType().name()); } - return false; } @Override @@ -95,31 +89,25 @@ public class WritableMemChunk implements IWritableMemChunk { switch (dataType) { case BOOLEAN: boolean[] boolValues = (boolean[]) valueList; - putBooleans(times, boolValues, bitMap, start, end); - break; + return putBooleansWithFlushCheck(times, boolValues, bitMap, start, end); case INT32: int[] intValues = (int[]) valueList; - putInts(times, intValues, bitMap, start, end); - break; + return putIntsWithFlushCheck(times, intValues, bitMap, start, end); case INT64: long[] longValues = (long[]) valueList; - putLongs(times, longValues, bitMap, start, end); - break; + return putLongsWithFlushCheck(times, longValues, bitMap, start, end); case FLOAT: float[] floatValues = (float[]) valueList; - putFloats(times, floatValues, bitMap, start, end); - break; + return putFloatsWithFlushCheck(times, floatValues, bitMap, start, end); case DOUBLE: double[] doubleValues = (double[]) valueList; - putDoubles(times, doubleValues, bitMap, start, end); - break; + return putDoublesWithFlushCheck(times, doubleValues, bitMap, start, end); case TEXT: Binary[] binaryValues = (Binary[]) valueList; return putBinariesWithFlushCheck(times, binaryValues, bitMap, start, end); default: - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType); + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType.name()); } - return false; } @Override @@ -134,34 +122,39 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public void putLong(long t, long v) { + public boolean putLongWithFlushCheck(long t, long v) { list.putLong(t, v); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putInt(long t, int v) { + public boolean putIntWithFlushCheck(long t, int v) { list.putInt(t, v); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putFloat(long t, float v) { + public boolean putFloatWithFlushCheck(long t, float v) { list.putFloat(t, v); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putDouble(long t, double v) { + public boolean putDoubleWithFlushCheck(long t, double v) { list.putDouble(t, v); + return list.reachChunkSizeOrPointNumThreshold(); } @Override public boolean putBinaryWithFlushCheck(long t, Binary v) { list.putBinary(t, v); - return list.reachMaxChunkSizeThreshold(); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putBoolean(long t, boolean v) { + public boolean putBooleanWithFlushCheck(long t, boolean v) { list.putBoolean(t, v); + return list.reachChunkSizeOrPointNumThreshold(); } @Override @@ -170,35 +163,41 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) { + public boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int start, int end) { list.putLongs(t, v, bitMap, start, end); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) { + public boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int start, int end) { list.putInts(t, v, bitMap, start, end); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) { + public boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, int start, int end) { list.putFloats(t, v, bitMap, start, end); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) { + public boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, int start, int end) { list.putDoubles(t, v, bitMap, start, end); + return list.reachChunkSizeOrPointNumThreshold(); } @Override public boolean putBinariesWithFlushCheck( long[] t, Binary[] v, BitMap bitMap, int start, int end) { list.putBinaries(t, v, bitMap, start, end); - return list.reachMaxChunkSizeThreshold(); + return list.reachChunkSizeOrPointNumThreshold(); } @Override - public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) { + public boolean putBooleansWithFlushCheck( + long[] t, boolean[] v, BitMap bitMap, int start, int end) { list.putBooleans(t, v, bitMap, start, end); + return list.reachChunkSizeOrPointNumThreshold(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 4092db7d442..4949044d590 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -698,8 +698,8 @@ public abstract class AlignedTVList extends TVList { } @Override - public boolean reachMaxChunkSizeThreshold() { - return reachMaxChunkSizeFlag; + public boolean reachChunkSizeOrPointNumThreshold() { + return reachMaxChunkSizeFlag || rowCount >= MAX_SERIES_POINT_NUMBER; } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index 4c5d4061530..bce5dc083c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -98,8 +98,8 @@ public abstract class BinaryTVList extends TVList { } @Override - public boolean reachMaxChunkSizeThreshold() { - return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE; + public boolean reachChunkSizeOrPointNumThreshold() { + return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE || rowCount >= MAX_SERIES_POINT_NUMBER; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index f2b26158a5f..f6c34ba0d62 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -49,6 +49,8 @@ public abstract class TVList implements WALEntryValue { protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent"; protected static final long TARGET_CHUNK_SIZE = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); + protected static final long MAX_SERIES_POINT_NUMBER = + IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold(); // list of timestamp array, add 1 when expanded -> data point timestamp array // index relation: arrayIndex -> elementIndex protected List<long[]> timestamps; @@ -148,8 +150,8 @@ public abstract class TVList implements WALEntryValue { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } - public boolean reachMaxChunkSizeThreshold() { - return false; + public boolean reachChunkSizeOrPointNumThreshold() { + return rowCount >= MAX_SERIES_POINT_NUMBER; } public void putBoolean(long time, boolean value) {
