This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch split_text_chunk in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de38a7de25d3606c3f8228e16475b296f8b49a48 Author: HTHou <[email protected]> AuthorDate: Thu Apr 25 14:41:39 2024 +0800 Split non_aligned charge text chunk --- .../dataregion/flush/MemTableFlushTask.java | 11 +----- .../dataregion/memtable/IWritableMemChunk.java | 3 +- .../dataregion/memtable/WritableMemChunk.java | 40 +++++++++++++++++----- .../iotdb/db/utils/datastructure/BinaryTVList.java | 20 ----------- .../db/utils/datastructure/BinaryTVListTest.java | 30 ---------------- 5 files changed, 35 insertions(+), 69 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 45a398b96bb..c3aea6bd2e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -247,16 +247,7 @@ public class MemTableFlushTask { } else { long starTime = System.currentTimeMillis(); IWritableMemChunk writableMemChunk = (IWritableMemChunk) task; - IChunkWriter seriesWriter = writableMemChunk.createIChunkWriter(); - writableMemChunk.encode(seriesWriter); - seriesWriter.sealCurrentPage(); - seriesWriter.clearPageWriter(); - try { - ioTaskQueue.put(seriesWriter); - } catch (InterruptedException e) { - LOGGER.error("Put task into ioTaskQueue Interrupted"); - Thread.currentThread().interrupt(); - } + writableMemChunk.encode(ioTaskQueue); long subTaskTime = System.currentTimeMillis() - starTime; WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 8e83b60b899..2c9f8d20354 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -28,6 +28,7 @@ import org.apache.tsfile.write.chunk.IChunkWriter; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; public interface IWritableMemChunk extends WALEntryValue { @@ -134,7 +135,7 @@ public interface IWritableMemChunk extends WALEntryValue { IChunkWriter createIChunkWriter(); - void encode(IChunkWriter chunkWriter); + void encode(LinkedBlockingQueue<Object> ioTaskQueue); void release(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index a110f772652..1040a068900 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -30,7 +30,6 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.chunk.ChunkWriterImpl; -import org.apache.tsfile.write.chunk.IChunkWriter; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; @@ -40,12 +39,18 @@ import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; public class WritableMemChunk implements IWritableMemChunk { private IMeasurementSchema schema; private TVList list; private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; + + private static final long TARGET_CHUNK_SIZE = + IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); private static final Logger LOGGER = LoggerFactory.getLogger(WritableMemChunk.class); private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); @@ -285,7 +290,7 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public IChunkWriter createIChunkWriter() { + public ChunkWriterImpl createIChunkWriter() { return new ChunkWriterImpl(schema); } @@ -322,15 +327,14 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public void encode(IChunkWriter chunkWriter) { - - ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter; + public void encode(LinkedBlockingQueue<Object> ioTaskQueue) { + TSDataType tsDataType = schema.getType(); + ChunkWriterImpl chunkWriterImpl = createIChunkWriter(); + long binarySizePerChunk = 0; for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { long time = list.getTime(sortedRowIndex); - TSDataType tsDataType = schema.getType(); - // skip duplicated data if ((sortedRowIndex + 1 < list.rowCount() && (time == list.getTime(sortedRowIndex + 1)))) { long recordSize = @@ -364,13 +368,33 @@ public class WritableMemChunk implements IWritableMemChunk { chunkWriterImpl.write(time, list.getDouble(sortedRowIndex)); break; case TEXT: - chunkWriterImpl.write(time, list.getBinary(sortedRowIndex)); + Binary value = list.getBinary(sortedRowIndex); + chunkWriterImpl.write(time, value); + binarySizePerChunk += getBinarySize(value); + if (binarySizePerChunk > TARGET_CHUNK_SIZE) { + chunkWriterImpl.sealCurrentPage(); + chunkWriterImpl.clearPageWriter(); + try { + ioTaskQueue.put(chunkWriterImpl); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + chunkWriterImpl = createIChunkWriter(); + binarySizePerChunk = 0; + } break; default: LOGGER.error("WritableMemChunk does not support data type: {}", tsDataType); break; } } + chunkWriterImpl.sealCurrentPage(); + chunkWriterImpl.clearPageWriter(); + try { + ioTaskQueue.put(chunkWriterImpl); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index 8f90296d774..7ade12a86f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -39,20 +39,15 @@ import java.util.List; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; -import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; public abstract class BinaryTVList extends TVList { // list of primitive array, add 1 when expanded -> Binary primitive array // index relation: arrayIndex -> elementIndex protected List<Binary[]> values; - // record total memory size of binary tvlist - long memoryBinaryChunkSize; - BinaryTVList() { super(); values = new ArrayList<>(); - memoryBinaryChunkSize = 0; } public static BinaryTVList newList() { @@ -70,7 +65,6 @@ public abstract class BinaryTVList extends TVList { public TimBinaryTVList clone() { TimBinaryTVList cloneList = new TimBinaryTVList(); cloneAs(cloneList); - cloneList.memoryBinaryChunkSize = memoryBinaryChunkSize; for (Binary[] valueArray : values) { cloneList.values.add(cloneValue(valueArray)); } @@ -95,12 +89,6 @@ public abstract class BinaryTVList extends TVList { if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) { sorted = false; } - memoryBinaryChunkSize += getBinarySize(value); - } - - @Override - public boolean reachMaxChunkSizeThreshold() { - return memoryBinaryChunkSize >= TARGET_CHUNK_SIZE; } @Override @@ -112,8 +100,6 @@ public abstract class BinaryTVList extends TVList { if (time < lowerBound || time > upperBound) { set(i, newSize++); maxTime = Math.max(maxTime, time); - } else { - memoryBinaryChunkSize -= getBinarySize(getBinary(i)); } } int deletedNumber = rowCount - newSize; @@ -159,7 +145,6 @@ public abstract class BinaryTVList extends TVList { } values.clear(); } - memoryBinaryChunkSize = 0; } @Override @@ -222,11 +207,6 @@ public abstract class BinaryTVList extends TVList { updateMaxTimeAndSorted(time, start, end); } - // update raw size - for (int i = idx; i < end; i++) { - memoryBinaryChunkSize += getBinarySize(value[i]); - } - while (idx < end) { int inputRemaining = end - idx; int arrayIdx = rowCount / ARRAY_SIZE; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java index 1208ce4878d..0ac18c375c3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java @@ -107,35 +107,5 @@ public class BinaryTVListTest { Assert.assertEquals(tvList.getBinary((int) i), clonedTvList.getBinary((int) i)); Assert.assertEquals(tvList.getTime((int) i), clonedTvList.getTime((int) i)); } - Assert.assertEquals(tvList.memoryBinaryChunkSize, clonedTvList.memoryBinaryChunkSize); - } - - @Test - public void testCalculateChunkSize() { - BinaryTVList tvList = BinaryTVList.newList(); - for (int i = 0; i < 10; i++) { - tvList.putBinary(i, BytesUtils.valueOf(String.valueOf(i))); - } - Assert.assertEquals(tvList.memoryBinaryChunkSize, 360); - - Binary[] binaryList = new Binary[10]; - List<Long> timeList = new ArrayList<>(); - BitMap bitMap = new BitMap(10); - for (int i = 0; i < 10; i++) { - timeList.add((long) i + 10); - binaryList[i] = BytesUtils.valueOf(String.valueOf(i)); - if (i % 2 == 0) { - bitMap.mark(i); - } - } - tvList.putBinaries( - ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), binaryList, bitMap, 0, 10); - Assert.assertEquals(tvList.memoryBinaryChunkSize, 540); - - tvList.delete(5, 15); - Assert.assertEquals(tvList.memoryBinaryChunkSize, 252); - - tvList.clear(); - Assert.assertEquals(tvList.memoryBinaryChunkSize, 0); } }
