This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.3.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b6aa1a872ad4ddad433a0ddfa1fc53ad6beb44b6
Author: Haonan <[email protected]>
AuthorDate: Thu Jul 11 10:23:25 2024 +0800

    Limit the row count of a single TVList to avg_series_point_number_threshold 
(#12898)
    
    * init
    
    * optimize mem control code and flush metric
---
 .../iotdb/db/storageengine/StorageEngine.java      |  6 +-
 .../db/storageengine/dataregion/DataRegion.java    |  8 +--
 .../dataregion/memtable/AbstractMemTable.java      | 16 +++--
 .../memtable/AlignedWritableMemChunk.java          | 25 ++++----
 .../dataregion/memtable/IMemTable.java             |  7 +--
 .../dataregion/memtable/IWritableMemChunk.java     | 20 +++---
 .../dataregion/memtable/TsFileProcessor.java       | 28 ++++++---
 .../dataregion/memtable/WritableMemChunk.java      | 71 +++++++++++-----------
 .../db/utils/datastructure/AlignedTVList.java      |  4 +-
 .../iotdb/db/utils/datastructure/BinaryTVList.java |  4 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  6 +-
 11 files changed, 98 insertions(+), 97 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index f7737201f19..71145964ebb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -161,13 +161,9 @@ public class StorageEngine implements IService {
   }
 
   /** block insertion if the insertion is rejected by memory control */
-  public static void blockInsertionIfReject(TsFileProcessor tsFileProcessor)
-      throws WriteProcessRejectException {
+  public static void blockInsertionIfReject() throws 
WriteProcessRejectException {
     long startTime = System.currentTimeMillis();
     while (SystemInfo.getInstance().isRejected()) {
-      if (tsFileProcessor != null && tsFileProcessor.shouldFlush()) {
-        break;
-      }
       try {
         TimeUnit.MILLISECONDS.sleep(CONFIG.getCheckPeriodWhenInsertBlocked());
         if (System.currentTimeMillis() - startTime > 
CONFIG.getMaxWaitingTimeWhenInsertBlocked()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 55e94d07375..d3a4ea1fcfe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -861,7 +861,7 @@ public class DataRegion implements IDataRegionForQuery {
       throw new OutOfTTLException(
           insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - 
dataTTL));
     }
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("InsertRow");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
@@ -910,7 +910,7 @@ public class DataRegion implements IDataRegionForQuery {
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
   public void insertTablet(InsertTabletNode insertTabletNode)
       throws BatchProcessException, WriteProcessException {
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("insertTablet");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
@@ -3094,7 +3094,7 @@ public class DataRegion implements IDataRegionForQuery {
    */
   public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode)
       throws WriteProcessException, BatchProcessException {
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("InsertRowsOfOneDevice");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
@@ -3204,7 +3204,7 @@ public class DataRegion implements IDataRegionForQuery {
 
   public void insert(InsertRowsNode insertRowsNode)
       throws BatchProcessException, WriteProcessRejectException {
-    StorageEngine.blockInsertionIfReject(null);
+    StorageEngine.blockInsertionIfReject();
     long startTime = System.nanoTime();
     writeLock("InsertRows");
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 07f65e229f8..e898736485e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -69,6 +69,7 @@ public abstract class AbstractMemTable implements IMemTable {
   private static final DeviceIDFactory deviceIDFactory = 
DeviceIDFactory.getInstance();
 
   private boolean shouldFlush = false;
+  private boolean reachChunkSizeOrPointNumThreshold = false;
   private volatile FlushStatus flushStatus = FlushStatus.WORKING;
   private final int avgSeriesPointNumThreshold =
       
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
@@ -318,7 +319,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     IWritableMemChunkGroup memChunkGroup =
         createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
     if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, 
schemaList)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -331,7 +332,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
     IWritableMemChunkGroup memChunkGroup =
         createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
     if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, 
schemaList)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -353,7 +354,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
         schemaList,
         start,
         end)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -379,7 +380,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
         schemaList,
         start,
         end)) {
-      shouldFlush = true;
+      reachChunkSizeOrPointNumThreshold = true;
     }
   }
 
@@ -426,11 +427,8 @@ public abstract class AbstractMemTable implements 
IMemTable {
   }
 
   @Override
-  public boolean reachTotalPointNumThreshold() {
-    if (totalPointsNum == 0) {
-      return false;
-    }
-    return totalPointsNum >= totalPointsNumThreshold;
+  public boolean reachChunkSizeOrPointNumThreshold() {
+    return reachChunkSizeOrPointNumThreshold;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 489520b201c..934b616e393 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -87,22 +87,22 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   }
 
   @Override
-  public void putLong(long t, long v) {
+  public boolean putLongWithFlushCheck(long t, long v) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
   @Override
-  public void putInt(long t, int v) {
+  public boolean putIntWithFlushCheck(long t, int v) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
   @Override
-  public void putFloat(long t, float v) {
+  public boolean putFloatWithFlushCheck(long t, float v) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
   @Override
-  public void putDouble(long t, double v) {
+  public boolean putDoubleWithFlushCheck(long t, double v) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
@@ -112,33 +112,33 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   }
 
   @Override
-  public void putBoolean(long t, boolean v) {
+  public boolean putBooleanWithFlushCheck(long t, boolean v) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
   @Override
   public boolean putAlignedValueWithFlushCheck(long t, Object[] v) {
     list.putAlignedValue(t, v);
-    return list.reachMaxChunkSizeThreshold();
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
+  public boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int 
start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
   @Override
-  public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
+  public boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int 
start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
   @Override
-  public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int 
end) {
+  public boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, 
int start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
   @Override
-  public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int 
end) {
+  public boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, 
int start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
@@ -149,7 +149,8 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   }
 
   @Override
-  public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int 
end) {
+  public boolean putBooleansWithFlushCheck(
+      long[] t, boolean[] v, BitMap bitMap, int start, int end) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
TSDataType.VECTOR);
   }
 
@@ -157,7 +158,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
   public boolean putAlignedValuesWithFlushCheck(
       long[] t, Object[] v, BitMap[] bitMaps, int start, int end) {
     list.putAlignedValues(t, v, bitMaps, start, end);
-    return list.reachMaxChunkSizeThreshold();
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index b2288f174f8..222b4199105 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -75,12 +75,7 @@ public interface IMemTable extends WALEntryValue {
   /** only used when mem control enabled */
   long getTVListsRamCost();
 
-  /**
-   * only used when mem control enabled
-   *
-   * @return whether the average number of points in each WritableChunk 
reaches the threshold
-   */
-  boolean reachTotalPointNumThreshold();
+  boolean reachChunkSizeOrPointNumThreshold();
 
   int getSeriesNumber();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 4c07c3ece33..fea1770fdee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -30,31 +30,31 @@ import java.util.List;
 
 public interface IWritableMemChunk extends WALEntryValue {
 
-  void putLong(long t, long v);
+  boolean putLongWithFlushCheck(long t, long v);
 
-  void putInt(long t, int v);
+  boolean putIntWithFlushCheck(long t, int v);
 
-  void putFloat(long t, float v);
+  boolean putFloatWithFlushCheck(long t, float v);
 
-  void putDouble(long t, double v);
+  boolean putDoubleWithFlushCheck(long t, double v);
 
   boolean putBinaryWithFlushCheck(long t, Binary v);
 
-  void putBoolean(long t, boolean v);
+  boolean putBooleanWithFlushCheck(long t, boolean v);
 
   boolean putAlignedValueWithFlushCheck(long t, Object[] v);
 
-  void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end);
+  boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int start, 
int end);
 
-  void putInts(long[] t, int[] v, BitMap bitMap, int start, int end);
+  boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int start, 
int end);
 
-  void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end);
+  boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, int 
start, int end);
 
-  void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end);
+  boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, int 
start, int end);
 
   boolean putBinariesWithFlushCheck(long[] t, Binary[] v, BitMap bitMap, int 
start, int end);
 
-  void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);
+  boolean putBooleansWithFlushCheck(long[] t, boolean[] v, BitMap bitMap, int 
start, int end);
 
   boolean putAlignedValuesWithFlushCheck(
       long[] t, Object[] v, BitMap[] bitMaps, int start, int end);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 680f30389b5..6233b426b3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -43,7 +43,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
-import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
 import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
@@ -91,6 +90,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -909,7 +909,22 @@ public class TsFileProcessor {
     if (dataRegionInfo.needToReportToSystem()) {
       try {
         if (!SystemInfo.getInstance().reportStorageGroupStatus(dataRegionInfo, 
this)) {
-          StorageEngine.blockInsertionIfReject(this);
+          long startTime = System.currentTimeMillis();
+          while (SystemInfo.getInstance().isRejected()) {
+            if (workMemTable.shouldFlush()) {
+              break;
+            }
+            try {
+              
TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
+              if (System.currentTimeMillis() - startTime
+                  > config.getMaxWaitingTimeWhenInsertBlocked()) {
+                throw new WriteProcessRejectException(
+                    "System rejected over " + (System.currentTimeMillis() - 
startTime) + "ms");
+              }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          }
         }
       } catch (WriteProcessRejectException e) {
         dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement);
@@ -987,13 +1002,8 @@ public class TsFileProcessor {
     if (workMemTable.shouldFlush()) {
       return true;
     }
-    if (workMemTable.reachTotalPointNumThreshold()) {
-      logger.info(
-          "The avg series points num {} of tsfile {} reaches the threshold",
-          workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(),
-          tsFileResource.getTsFile().getAbsolutePath());
-      WritingMetrics.getInstance()
-          
.recordSeriesFullFlushMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(),
 1);
+    if (workMemTable.reachChunkSizeOrPointNumThreshold()) {
+      
WritingMetrics.getInstance().recordSeriesFullFlushMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(),
 1);
       return true;
     }
     return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index c1a71d32a53..134bf4310e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -61,26 +61,20 @@ public class WritableMemChunk implements IWritableMemChunk {
   public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
     switch (schema.getType()) {
       case BOOLEAN:
-        putBoolean(insertTime, (boolean) objectValue);
-        break;
+        return putBooleanWithFlushCheck(insertTime, (boolean) objectValue);
       case INT32:
-        putInt(insertTime, (int) objectValue);
-        break;
+        return putIntWithFlushCheck(insertTime, (int) objectValue);
       case INT64:
-        putLong(insertTime, (long) objectValue);
-        break;
+        return putLongWithFlushCheck(insertTime, (long) objectValue);
       case FLOAT:
-        putFloat(insertTime, (float) objectValue);
-        break;
+        return putFloatWithFlushCheck(insertTime, (float) objectValue);
       case DOUBLE:
-        putDouble(insertTime, (double) objectValue);
-        break;
+        return putDoubleWithFlushCheck(insertTime, (double) objectValue);
       case TEXT:
         return putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
       default:
-        throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
+        throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType().name());
     }
-    return false;
   }
 
   @Override
@@ -95,31 +89,25 @@ public class WritableMemChunk implements IWritableMemChunk {
     switch (dataType) {
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) valueList;
-        putBooleans(times, boolValues, bitMap, start, end);
-        break;
+        return putBooleansWithFlushCheck(times, boolValues, bitMap, start, 
end);
       case INT32:
         int[] intValues = (int[]) valueList;
-        putInts(times, intValues, bitMap, start, end);
-        break;
+        return putIntsWithFlushCheck(times, intValues, bitMap, start, end);
       case INT64:
         long[] longValues = (long[]) valueList;
-        putLongs(times, longValues, bitMap, start, end);
-        break;
+        return putLongsWithFlushCheck(times, longValues, bitMap, start, end);
       case FLOAT:
         float[] floatValues = (float[]) valueList;
-        putFloats(times, floatValues, bitMap, start, end);
-        break;
+        return putFloatsWithFlushCheck(times, floatValues, bitMap, start, end);
       case DOUBLE:
         double[] doubleValues = (double[]) valueList;
-        putDoubles(times, doubleValues, bitMap, start, end);
-        break;
+        return putDoublesWithFlushCheck(times, doubleValues, bitMap, start, 
end);
       case TEXT:
         Binary[] binaryValues = (Binary[]) valueList;
         return putBinariesWithFlushCheck(times, binaryValues, bitMap, start, 
end);
       default:
-        throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType);
+        throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
dataType.name());
     }
-    return false;
   }
 
   @Override
@@ -134,34 +122,39 @@ public class WritableMemChunk implements 
IWritableMemChunk {
   }
 
   @Override
-  public void putLong(long t, long v) {
+  public boolean putLongWithFlushCheck(long t, long v) {
     list.putLong(t, v);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putInt(long t, int v) {
+  public boolean putIntWithFlushCheck(long t, int v) {
     list.putInt(t, v);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putFloat(long t, float v) {
+  public boolean putFloatWithFlushCheck(long t, float v) {
     list.putFloat(t, v);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putDouble(long t, double v) {
+  public boolean putDoubleWithFlushCheck(long t, double v) {
     list.putDouble(t, v);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
   public boolean putBinaryWithFlushCheck(long t, Binary v) {
     list.putBinary(t, v);
-    return list.reachMaxChunkSizeThreshold();
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putBoolean(long t, boolean v) {
+  public boolean putBooleanWithFlushCheck(long t, boolean v) {
     list.putBoolean(t, v);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
@@ -170,35 +163,41 @@ public class WritableMemChunk implements 
IWritableMemChunk {
   }
 
   @Override
-  public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
+  public boolean putLongsWithFlushCheck(long[] t, long[] v, BitMap bitMap, int 
start, int end) {
     list.putLongs(t, v, bitMap, start, end);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
+  public boolean putIntsWithFlushCheck(long[] t, int[] v, BitMap bitMap, int 
start, int end) {
     list.putInts(t, v, bitMap, start, end);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int 
end) {
+  public boolean putFloatsWithFlushCheck(long[] t, float[] v, BitMap bitMap, 
int start, int end) {
     list.putFloats(t, v, bitMap, start, end);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int 
end) {
+  public boolean putDoublesWithFlushCheck(long[] t, double[] v, BitMap bitMap, 
int start, int end) {
     list.putDoubles(t, v, bitMap, start, end);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
   public boolean putBinariesWithFlushCheck(
       long[] t, Binary[] v, BitMap bitMap, int start, int end) {
     list.putBinaries(t, v, bitMap, start, end);
-    return list.reachMaxChunkSizeThreshold();
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
-  public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int 
end) {
+  public boolean putBooleansWithFlushCheck(
+      long[] t, boolean[] v, BitMap bitMap, int start, int end) {
     list.putBooleans(t, v, bitMap, start, end);
+    return list.reachChunkSizeOrPointNumThreshold();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 4092db7d442..4949044d590 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -698,8 +698,8 @@ public abstract class AlignedTVList extends TVList {
   }
 
   @Override
-  public boolean reachMaxChunkSizeThreshold() {
-    return reachMaxChunkSizeFlag;
+  public boolean reachChunkSizeOrPointNumThreshold() {
+    return reachMaxChunkSizeFlag || rowCount >= MAX_SERIES_POINT_NUMBER;
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 4c5d4061530..bce5dc083c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -98,8 +98,8 @@ public abstract class BinaryTVList extends TVList {
   }
 
   @Override
-  public boolean reachMaxChunkSizeThreshold() {
-    return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE;
+  public boolean reachChunkSizeOrPointNumThreshold() {
+    return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE || rowCount >= 
MAX_SERIES_POINT_NUMBER;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index f2b26158a5f..f6c34ba0d62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -49,6 +49,8 @@ public abstract class TVList implements WALEntryValue {
   protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not 
consistent";
   protected static final long TARGET_CHUNK_SIZE =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+  protected static final long MAX_SERIES_POINT_NUMBER =
+      
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
   // list of timestamp array, add 1 when expanded -> data point timestamp array
   // index relation: arrayIndex -> elementIndex
   protected List<long[]> timestamps;
@@ -148,8 +150,8 @@ public abstract class TVList implements WALEntryValue {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
-  public boolean reachMaxChunkSizeThreshold() {
-    return false;
+  public boolean reachChunkSizeOrPointNumThreshold() {
+    return rowCount >= MAX_SERIES_POINT_NUMBER;
   }
 
   public void putBoolean(long time, boolean value) {

Reply via email to