This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch split_text_chunk
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit de38a7de25d3606c3f8228e16475b296f8b49a48
Author: HTHou <[email protected]>
AuthorDate: Thu Apr 25 14:41:39 2024 +0800

    Split non_aligned charge text chunk
---
 .../dataregion/flush/MemTableFlushTask.java        | 11 +-----
 .../dataregion/memtable/IWritableMemChunk.java     |  3 +-
 .../dataregion/memtable/WritableMemChunk.java      | 40 +++++++++++++++++-----
 .../iotdb/db/utils/datastructure/BinaryTVList.java | 20 -----------
 .../db/utils/datastructure/BinaryTVListTest.java   | 30 ----------------
 5 files changed, 35 insertions(+), 69 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 45a398b96bb..c3aea6bd2e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -247,16 +247,7 @@ public class MemTableFlushTask {
             } else {
               long starTime = System.currentTimeMillis();
               IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
-              IChunkWriter seriesWriter = 
writableMemChunk.createIChunkWriter();
-              writableMemChunk.encode(seriesWriter);
-              seriesWriter.sealCurrentPage();
-              seriesWriter.clearPageWriter();
-              try {
-                ioTaskQueue.put(seriesWriter);
-              } catch (InterruptedException e) {
-                LOGGER.error("Put task into ioTaskQueue Interrupted");
-                Thread.currentThread().interrupt();
-              }
+              writableMemChunk.encode(ioTaskQueue);
               long subTaskTime = System.currentTimeMillis() - starTime;
               
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, 
subTaskTime);
               memSerializeTime += subTaskTime;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 8e83b60b899..2c9f8d20354 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.write.chunk.IChunkWriter;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 
 public interface IWritableMemChunk extends WALEntryValue {
 
@@ -134,7 +135,7 @@ public interface IWritableMemChunk extends WALEntryValue {
 
   IChunkWriter createIChunkWriter();
 
-  void encode(IChunkWriter chunkWriter);
+  void encode(LinkedBlockingQueue<Object> ioTaskQueue);
 
   void release();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index a110f772652..1040a068900 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -30,7 +30,6 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.tsfile.write.chunk.IChunkWriter;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -40,12 +39,18 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 
 public class WritableMemChunk implements IWritableMemChunk {
 
   private IMeasurementSchema schema;
   private TVList list;
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
+
+  private static final long TARGET_CHUNK_SIZE =
+      IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WritableMemChunk.class);
 
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
@@ -285,7 +290,7 @@ public class WritableMemChunk implements IWritableMemChunk {
   }
 
   @Override
-  public IChunkWriter createIChunkWriter() {
+  public ChunkWriterImpl createIChunkWriter() {
     return new ChunkWriterImpl(schema);
   }
 
@@ -322,15 +327,14 @@ public class WritableMemChunk implements 
IWritableMemChunk {
   }
 
   @Override
-  public void encode(IChunkWriter chunkWriter) {
-
-    ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
+  public void encode(LinkedBlockingQueue<Object> ioTaskQueue) {
 
+    TSDataType tsDataType = schema.getType();
+    ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
+    long binarySizePerChunk = 0;
     for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); 
sortedRowIndex++) {
       long time = list.getTime(sortedRowIndex);
 
-      TSDataType tsDataType = schema.getType();
-
       // skip duplicated data
       if ((sortedRowIndex + 1 < list.rowCount() && (time == 
list.getTime(sortedRowIndex + 1)))) {
         long recordSize =
@@ -364,13 +368,33 @@ public class WritableMemChunk implements 
IWritableMemChunk {
           chunkWriterImpl.write(time, list.getDouble(sortedRowIndex));
           break;
         case TEXT:
-          chunkWriterImpl.write(time, list.getBinary(sortedRowIndex));
+          Binary value = list.getBinary(sortedRowIndex);
+          chunkWriterImpl.write(time, value);
+          binarySizePerChunk += getBinarySize(value);
+          if (binarySizePerChunk > TARGET_CHUNK_SIZE) {
+            chunkWriterImpl.sealCurrentPage();
+            chunkWriterImpl.clearPageWriter();
+            try {
+              ioTaskQueue.put(chunkWriterImpl);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+            chunkWriterImpl = createIChunkWriter();
+            binarySizePerChunk = 0;
+          }
           break;
         default:
           LOGGER.error("WritableMemChunk does not support data type: {}", 
tsDataType);
           break;
       }
     }
+    chunkWriterImpl.sealCurrentPage();
+    chunkWriterImpl.clearPageWriter();
+    try {
+      ioTaskQueue.put(chunkWriterImpl);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 8f90296d774..7ade12a86f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -39,20 +39,15 @@ import java.util.List;
 
 import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
 import static 
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
-import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
 
 public abstract class BinaryTVList extends TVList {
   // list of primitive array, add 1 when expanded -> Binary primitive array
   // index relation: arrayIndex -> elementIndex
   protected List<Binary[]> values;
 
-  // record total memory size of binary tvlist
-  long memoryBinaryChunkSize;
-
   BinaryTVList() {
     super();
     values = new ArrayList<>();
-    memoryBinaryChunkSize = 0;
   }
 
   public static BinaryTVList newList() {
@@ -70,7 +65,6 @@ public abstract class BinaryTVList extends TVList {
   public TimBinaryTVList clone() {
     TimBinaryTVList cloneList = new TimBinaryTVList();
     cloneAs(cloneList);
-    cloneList.memoryBinaryChunkSize = memoryBinaryChunkSize;
     for (Binary[] valueArray : values) {
       cloneList.values.add(cloneValue(valueArray));
     }
@@ -95,12 +89,6 @@ public abstract class BinaryTVList extends TVList {
     if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
       sorted = false;
     }
-    memoryBinaryChunkSize += getBinarySize(value);
-  }
-
-  @Override
-  public boolean reachMaxChunkSizeThreshold() {
-    return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE;
   }
 
   @Override
@@ -112,8 +100,6 @@ public abstract class BinaryTVList extends TVList {
       if (time < lowerBound || time > upperBound) {
         set(i, newSize++);
         maxTime = Math.max(maxTime, time);
-      } else {
-        memoryBinaryChunkSize -= getBinarySize(getBinary(i));
       }
     }
     int deletedNumber = rowCount - newSize;
@@ -159,7 +145,6 @@ public abstract class BinaryTVList extends TVList {
       }
       values.clear();
     }
-    memoryBinaryChunkSize = 0;
   }
 
   @Override
@@ -222,11 +207,6 @@ public abstract class BinaryTVList extends TVList {
       updateMaxTimeAndSorted(time, start, end);
     }
 
-    // update raw size
-    for (int i = idx; i < end; i++) {
-      memoryBinaryChunkSize += getBinarySize(value[i]);
-    }
-
     while (idx < end) {
       int inputRemaining = end - idx;
       int arrayIdx = rowCount / ARRAY_SIZE;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
index 1208ce4878d..0ac18c375c3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
@@ -107,35 +107,5 @@ public class BinaryTVListTest {
       Assert.assertEquals(tvList.getBinary((int) i), 
clonedTvList.getBinary((int) i));
       Assert.assertEquals(tvList.getTime((int) i), clonedTvList.getTime((int) 
i));
     }
-    Assert.assertEquals(tvList.memoryBinaryChunkSize, 
clonedTvList.memoryBinaryChunkSize);
-  }
-
-  @Test
-  public void testCalculateChunkSize() {
-    BinaryTVList tvList = BinaryTVList.newList();
-    for (int i = 0; i < 10; i++) {
-      tvList.putBinary(i, BytesUtils.valueOf(String.valueOf(i)));
-    }
-    Assert.assertEquals(tvList.memoryBinaryChunkSize, 360);
-
-    Binary[] binaryList = new Binary[10];
-    List<Long> timeList = new ArrayList<>();
-    BitMap bitMap = new BitMap(10);
-    for (int i = 0; i < 10; i++) {
-      timeList.add((long) i + 10);
-      binaryList[i] = BytesUtils.valueOf(String.valueOf(i));
-      if (i % 2 == 0) {
-        bitMap.mark(i);
-      }
-    }
-    tvList.putBinaries(
-        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), binaryList, 
bitMap, 0, 10);
-    Assert.assertEquals(tvList.memoryBinaryChunkSize, 540);
-
-    tvList.delete(5, 15);
-    Assert.assertEquals(tvList.memoryBinaryChunkSize, 252);
-
-    tvList.clear();
-    Assert.assertEquals(tvList.memoryBinaryChunkSize, 0);
   }
 }

Reply via email to