This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ef8dc52e89f perf: add encodeBatch interface to accelerate flushing for
sequential insert (#15243)
ef8dc52e89f is described below
commit ef8dc52e89f8fa074c9beb5c814b8780aa972ef0
Author: shizy <[email protected]>
AuthorDate: Tue Apr 8 10:26:48 2025 +0800
perf: add encodeBatch interface to accelerate flushing for sequential
insert (#15243)
* perf: batch encode to improve insert performance
* fix IoTDBSimpleQueryIT
* move batchEncodeInfo / times as MemTableFlushTask member
---
.../dataregion/flush/MemTableFlushTask.java | 15 +-
.../memtable/AbstractWritableMemChunk.java | 4 +-
.../memtable/AlignedWritableMemChunk.java | 114 ++++-----------
.../dataregion/memtable/IWritableMemChunk.java | 3 +-
.../dataregion/memtable/TsFileProcessor.java | 59 ++++----
.../dataregion/memtable/WritableMemChunk.java | 93 +++----------
.../db/utils/datastructure/AlignedTVList.java | 154 ++++++++++++++++++++-
...{MemPointIterator.java => BatchEncodeInfo.java} | 27 +++-
.../db/utils/datastructure/MemPointIterator.java | 3 +
.../MergeSortMultiAlignedTVListIterator.java | 105 ++++++++++++++
.../MergeSortMultiTVListIterator.java | 77 ++++++++++-
.../utils/datastructure/MultiTVListIterator.java | 11 +-
.../OrderedMultiAlignedTVListIterator.java | 15 ++
.../datastructure/OrderedMultiTVListIterator.java | 31 ++++-
.../iotdb/db/utils/datastructure/TVList.java | 119 +++++++++++++---
15 files changed, 595 insertions(+), 235 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index a14bc85e17a..17fce5dd197 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -27,12 +27,15 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.flush.pool.FlushSubTaskPoolManager;
+import
org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
import
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.write.chunk.IChunkWriter;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -62,6 +65,9 @@ public class MemTableFlushTask {
FlushSubTaskPoolManager.getInstance();
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
+ TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
/* storage group name -> last time */
private static final Map<String, Long> flushPointsCache = new
ConcurrentHashMap<>();
private final Future<?> encodingTaskFuture;
@@ -82,6 +88,9 @@ public class MemTableFlushTask {
private volatile long memSerializeTime = 0L;
private volatile long ioTime = 0L;
+ private final BatchEncodeInfo encodeInfo;
+ private long[] times;
+
/**
* @param memTable the memTable to flush
* @param writer the writer where memTable will be flushed to (current
tsfile writer or vm writer)
@@ -98,6 +107,7 @@ public class MemTableFlushTask {
this.dataRegionId = dataRegionId;
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
+ this.encodeInfo = new BatchEncodeInfo(0, 0, 0);
LOGGER.debug(
"flush task of database {} memtable is created, flushing to file {}.",
storageGroup,
@@ -248,7 +258,10 @@ public class MemTableFlushTask {
} else {
long starTime = System.currentTimeMillis();
IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
- writableMemChunk.encode(ioTaskQueue);
+ if (writableMemChunk instanceof AlignedWritableMemChunk && times
== null) {
+ times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+ }
+ writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK,
subTaskTime);
memSerializeTime += subTaskTime;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index d4b4cfde383..ac71e3da6bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.enums.TSDataType;
@@ -192,7 +193,8 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
public abstract IChunkWriter createIChunkWriter();
@Override
- public abstract void encode(BlockingQueue<Object> ioTaskQueue);
+ public abstract void encode(
+ BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times);
@Override
public abstract void release();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index b6db03ec294..013b2d6109c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -32,14 +33,12 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.chunk.IChunkWriter;
-import org.apache.tsfile.write.chunk.ValueChunkWriter;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -65,6 +64,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
private final List<IMeasurementSchema> schemaList;
private AlignedTVList list;
private List<AlignedTVList> sortedList;
+ private long sortedRowCount = 0;
private final boolean ignoreAllNullRows;
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
@@ -197,6 +197,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
list.sort();
}
sortedList.add(list);
+ this.sortedRowCount += list.rowCount();
this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
this.dataTypes = list.getTsDataTypes();
}
@@ -352,15 +353,11 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
@Override
public long rowCount() {
- return alignedListSize();
+ return sortedRowCount + list.rowCount();
}
public int alignedListSize() {
- int rowCount = list.rowCount();
- for (AlignedTVList alignedTvList : sortedList) {
- rowCount += alignedTvList.rowCount();
- }
- return rowCount;
+ return (int) rowCount();
}
@Override
@@ -638,107 +635,56 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
@Override
- public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
+ public synchronized void encode(
+ BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
if (TVLIST_SORT_THRESHOLD == 0) {
encodeWorkingAlignedTVList(ioTaskQueue);
return;
}
AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(schemaList);
+
// create MergeSortAlignedTVListIterator.
List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
alignedTvLists.add(list);
+ List<Integer> columnIndexList = buildColumnIndexList(schemaList);
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(dataTypes, null, alignedTvLists,
ignoreAllNullRows);
-
- int pointNumInPage = 0;
- int pointNumInChunk = 0;
- long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+ MemPointIteratorFactory.create(
+ dataTypes, columnIndexList, alignedTvLists, ignoreAllNullRows);
while (timeValuePairIterator.hasNextBatch()) {
- TsBlock tsBlock = timeValuePairIterator.nextBatch();
- if (tsBlock == null) {
- continue;
+ timeValuePairIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
+ if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
+ encodeInfo.pointNumInPage = 0;
}
- for (int rowIndex = 0; rowIndex < tsBlock.getPositionCount();
rowIndex++) {
- long time = tsBlock.getTimeByIndex(rowIndex);
- times[pointNumInPage] = time;
- for (int columnIndex = 0; columnIndex < dataTypes.size();
columnIndex++) {
- ValueChunkWriter valueChunkWriter =
- alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
- if (tsBlock.getColumn(columnIndex).isNull(rowIndex)) {
- valueChunkWriter.write(time, null, true);
- continue;
- }
- switch (schemaList.get(columnIndex).getType()) {
- case BOOLEAN:
- valueChunkWriter.write(
- time, tsBlock.getColumn(columnIndex).getBoolean(rowIndex),
false);
- break;
- case INT32:
- case DATE:
- valueChunkWriter.write(time,
tsBlock.getColumn(columnIndex).getInt(rowIndex), false);
- break;
- case INT64:
- case TIMESTAMP:
- valueChunkWriter.write(time,
tsBlock.getColumn(columnIndex).getLong(rowIndex), false);
- break;
- case FLOAT:
- valueChunkWriter.write(
- time, tsBlock.getColumn(columnIndex).getFloat(rowIndex),
false);
- break;
- case DOUBLE:
- valueChunkWriter.write(
- time, tsBlock.getColumn(columnIndex).getDouble(rowIndex),
false);
- break;
- case TEXT:
- case BLOB:
- case STRING:
- valueChunkWriter.write(
- time, tsBlock.getColumn(columnIndex).getBinary(rowIndex),
false);
- break;
- default:
- break;
- }
- }
- pointNumInPage++;
- pointNumInChunk++;
-
- // new page
- if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE
- || pointNumInChunk >= maxNumberOfPointsInChunk) {
- alignedChunkWriter.write(times, pointNumInPage, 0);
- pointNumInPage = 0;
- }
-
- // new chunk
- if (pointNumInChunk >= maxNumberOfPointsInChunk) {
- alignedChunkWriter.sealCurrentPage();
- alignedChunkWriter.clearPageWriter();
- try {
- ioTaskQueue.put(alignedChunkWriter);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
- pointNumInChunk = 0;
+ if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.clearPageWriter();
+ try {
+ ioTaskQueue.put(alignedChunkWriter);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
+ encodeInfo.reset();
}
}
// last batch of points
- if (pointNumInChunk > 0) {
- if (pointNumInPage > 0) {
- alignedChunkWriter.write(times, pointNumInPage, 0);
- alignedChunkWriter.sealCurrentPage();
- alignedChunkWriter.clearPageWriter();
+ if (encodeInfo.pointNumInChunk > 0) {
+ if (encodeInfo.pointNumInPage > 0) {
+ alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
}
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.clearPageWriter();
try {
ioTaskQueue.put(alignedChunkWriter);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
+ encodeInfo.reset();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 26abdf1d39c..13f09a05d8a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.enums.TSDataType;
@@ -111,7 +112,7 @@ public interface IWritableMemChunk extends WALEntryValue {
IChunkWriter createIChunkWriter();
- void encode(BlockingQueue<Object> ioTaskQueue);
+ void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo,
long[] times);
void release();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index ecd6dc2be39..d56868671ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -652,17 +652,16 @@ public class TsFileProcessor {
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
- if (workMemTable.chunkNotExist(deviceId, measurements[i])) {
+ IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId,
measurements[i]);
+ if (memChunk == null) {
// ChunkMetadataIncrement
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
} else {
// here currentChunkPointNum >= 1
- IWritableMemChunk memChunk =
workMemTable.getWritableMemChunk(deviceId, measurements[i]);
- long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
+ long currentChunkPointNum = memChunk.rowCount();
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
- memTableIncrement +=
- memChunk != null ?
memChunk.getWorkingTVList().tvListArrayMemCost() : 0;
+ memTableIncrement +=
memChunk.getWorkingTVList().tvListArrayMemCost();
}
}
// TEXT data mem size
@@ -693,7 +692,8 @@ public class TsFileProcessor {
if (dataTypes[i] == null || measurements[i] == null) {
continue;
}
- if (workMemTable.chunkNotExist(deviceId, measurements[i])
+ IWritableMemChunk memChunk =
workMemTable.getWritableMemChunk(deviceId, measurements[i]);
+ if (memChunk == null
&& (!increasingMemTableInfo.containsKey(deviceId)
||
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
// ChunkMetadataIncrement
@@ -704,7 +704,6 @@ public class TsFileProcessor {
.putIfAbsent(measurements[i], 1);
} else {
// here currentChunkPointNum >= 1
- IWritableMemChunk memChunk =
workMemTable.getWritableMemChunk(deviceId, measurements[i]);
int addingPointNum =
increasingMemTableInfo
.computeIfAbsent(deviceId, k -> new HashMap<>())
@@ -741,7 +740,9 @@ public class TsFileProcessor {
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
- if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
+ IWritableMemChunk memChunk =
+ workMemTable.getWritableMemChunk(deviceId,
AlignedPath.VECTOR_PLACEHOLDER);
+ if (memChunk == null) {
// For new device of this mem table
// ChunkMetadataIncrement
chunkMetadataIncrement +=
@@ -750,9 +751,7 @@ public class TsFileProcessor {
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes,
columnCategories);
} else {
// For existed device of this mem table
- AlignedWritableMemChunk alignedMemChunk =
- ((AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId))
- .getAlignedMemChunk();
+ AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk)
memChunk;
List<TSDataType> dataTypesInTVList = new ArrayList<>();
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
@@ -775,8 +774,7 @@ public class TsFileProcessor {
}
// this insertion will result in a new array
if ((alignedMemChunk.alignedListSize() %
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
- dataTypesInTVList.addAll(
- ((AlignedTVList)
alignedMemChunk.getWorkingTVList()).getTsDataTypes());
+
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
memTableIncrement +=
alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
}
}
@@ -805,8 +803,10 @@ public class TsFileProcessor {
TSDataType[] dataTypes = insertRowNode.getDataTypes();
Object[] values = insertRowNode.getValues();
String[] measurements = insertRowNode.getMeasurements();
- if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)
- && !increasingMemTableInfo.containsKey(deviceId)) {
+
+ IWritableMemChunk memChunk =
+ workMemTable.getWritableMemChunk(deviceId,
AlignedPath.VECTOR_PLACEHOLDER);
+ if (memChunk == null && !increasingMemTableInfo.containsKey(deviceId)) {
// For new device of this mem table
// ChunkMetadataIncrement
chunkMetadataIncrement +=
@@ -829,10 +829,7 @@ public class TsFileProcessor {
} else {
// For existed device of this mem table
- AlignedWritableMemChunkGroup memChunkGroup =
- (AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId);
- AlignedWritableMemChunk alignedMemChunk =
- memChunkGroup == null ? null : memChunkGroup.getAlignedMemChunk();
+ AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk)
memChunk;
int currentChunkPointNum = alignedMemChunk == null ? 0 :
alignedMemChunk.alignedListSize();
List<TSDataType> dataTypesInTVList = new ArrayList<>();
Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
@@ -867,8 +864,7 @@ public class TsFileProcessor {
// Here currentChunkPointNum + addingPointNum >= 1
if (((currentChunkPointNum + addingPointNum) %
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
if (alignedMemChunk != null) {
- dataTypesInTVList.addAll(
- ((AlignedTVList)
alignedMemChunk.getWorkingTVList()).getTsDataTypes());
+
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
}
dataTypesInTVList.addAll(addingPointNumInfo.left.values());
memTableIncrement +=
@@ -969,21 +965,19 @@ public class TsFileProcessor {
Object column) {
// memIncrements = [memTable, text, chunk metadata] respectively
- if (workMemTable.chunkNotExist(deviceId, measurement)) {
+ IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId,
measurement);
+ if (memChunk == null) {
// ChunkMetadataIncrement
memIncrements[2] += ChunkMetadata.calculateRamSize(measurement,
dataType);
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
* TVList.tvListArrayMemCost(dataType);
} else {
- IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId,
measurement);
- long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
+ long currentChunkPointNum = memChunk.rowCount();
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
memIncrements[0] +=
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
- * (memChunk != null
- ? memChunk.getWorkingTVList().tvListArrayMemCost()
- : TVList.tvListArrayMemCost(dataType));
+ * memChunk.getWorkingTVList().tvListArrayMemCost();
} else {
long acquireArray =
(end - start - 1 + (currentChunkPointNum %
PrimitiveArrayManager.ARRAY_SIZE))
@@ -1035,7 +1029,9 @@ public class TsFileProcessor {
}
// memIncrements = [memTable, text, chunk metadata] respectively
- if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
+ IWritableMemChunk memChunk =
+ workMemTable.getWritableMemChunk(deviceId,
AlignedPath.VECTOR_PLACEHOLDER);
+ if (memChunk == null) {
// new devices introduce new ChunkMetadata
// ChunkMetadata memory Increment
memIncrements[2] +=
@@ -1049,9 +1045,7 @@ public class TsFileProcessor {
memIncrements[0] +=
numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes,
columnCategories);
} else {
- AlignedWritableMemChunk alignedMemChunk =
- ((AlignedWritableMemChunkGroup)
workMemTable.getMemTableMap().get(deviceId))
- .getAlignedMemChunk();
+ AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk)
memChunk;
List<TSDataType> dataTypesInTVList = new ArrayList<>();
int currentPointNum = alignedMemChunk.alignedListSize();
int newPointNum = currentPointNum + incomingPointNum;
@@ -1086,8 +1080,7 @@ public class TsFileProcessor {
if (acquireArray != 0) {
// memory of extending the TVList
- dataTypesInTVList.addAll(
- ((AlignedTVList)
alignedMemChunk.getWorkingTVList()).getTsDataTypes());
+
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
memIncrements[0] +=
acquireArray *
alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 5c8a41621ff..86bbbde0bdd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -31,7 +32,6 @@ import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.TimeRange;
-import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -57,6 +57,7 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
private IMeasurementSchema schema;
private TVList list;
private List<TVList> sortedList;
+ private long sortedRowCount = 0;
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
private static final Logger LOGGER =
LoggerFactory.getLogger(WritableMemChunk.class);
@@ -79,6 +80,7 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
list.sort();
}
sortedList.add(list);
+ this.sortedRowCount += list.rowCount();
this.list = TVList.newList(schema.getType());
}
@@ -272,11 +274,7 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
@Override
public long rowCount() {
- long rowCount = list.rowCount();
- for (TVList tvList : sortedList) {
- rowCount += tvList.rowCount();
- }
- return rowCount;
+ return sortedRowCount + list.rowCount();
}
@Override
@@ -458,16 +456,17 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
}
@Override
- public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
+ public synchronized void encode(
+ BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
if (TVLIST_SORT_THRESHOLD == 0) {
encodeWorkingTVList(ioTaskQueue);
return;
}
- TSDataType tsDataType = schema.getType();
ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
- long dataSizeInCurrentChunk = 0;
- int pointNumInCurrentChunk = 0;
+ if (sortedList.isEmpty()) {
+ encodeInfo.lastIterator = true;
+ }
// create MultiTvListIterator. It need not handle float/double precision
here.
List<TVList> tvLists = new ArrayList<>(sortedList);
@@ -476,70 +475,21 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
MemPointIteratorFactory.create(schema.getType(), tvLists);
while (timeValuePairIterator.hasNextBatch()) {
- TsBlock tsBlock = timeValuePairIterator.nextBatch();
- if (tsBlock == null) {
- continue;
- }
-
- for (int rowIndex = 0; rowIndex < tsBlock.getPositionCount();
rowIndex++) {
- long time = tsBlock.getTimeByIndex(rowIndex);
- // store last point for SDT
- if (rowIndex + 1 == tsBlock.getPositionCount() &&
!timeValuePairIterator.hasNextBatch()) {
- chunkWriterImpl.setLastPoint(true);
- }
-
- switch (tsDataType) {
- case BOOLEAN:
- chunkWriterImpl.write(time,
tsBlock.getColumn(0).getBoolean(rowIndex));
- dataSizeInCurrentChunk += 8L + 1L;
- break;
- case INT32:
- case DATE:
- chunkWriterImpl.write(time, tsBlock.getColumn(0).getInt(rowIndex));
- dataSizeInCurrentChunk += 8L + 4L;
- break;
- case INT64:
- case TIMESTAMP:
- chunkWriterImpl.write(time,
tsBlock.getColumn(0).getLong(rowIndex));
- dataSizeInCurrentChunk += 8L + 8L;
- break;
- case FLOAT:
- chunkWriterImpl.write(time,
tsBlock.getColumn(0).getFloat(rowIndex));
- dataSizeInCurrentChunk += 8L + 4L;
- break;
- case DOUBLE:
- chunkWriterImpl.write(time,
tsBlock.getColumn(0).getDouble(rowIndex));
- dataSizeInCurrentChunk += 8L + 8L;
- break;
- case TEXT:
- case BLOB:
- case STRING:
- Binary value = tsBlock.getColumn(0).getBinary(rowIndex);
- chunkWriterImpl.write(time, value);
- dataSizeInCurrentChunk += 8L + getBinarySize(value);
- break;
- default:
- LOGGER.error("WritableMemChunk does not support data type: {}",
tsDataType);
- break;
- }
-
- pointNumInCurrentChunk++;
- if (pointNumInCurrentChunk > MAX_NUMBER_OF_POINTS_IN_CHUNK
- || dataSizeInCurrentChunk > TARGET_CHUNK_SIZE) {
- chunkWriterImpl.sealCurrentPage();
- chunkWriterImpl.clearPageWriter();
- try {
- ioTaskQueue.put(chunkWriterImpl);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- chunkWriterImpl = createIChunkWriter();
- dataSizeInCurrentChunk = 0;
- pointNumInCurrentChunk = 0;
+ timeValuePairIterator.encodeBatch(chunkWriterImpl, encodeInfo, times);
+ if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
+ || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ chunkWriterImpl.sealCurrentPage();
+ chunkWriterImpl.clearPageWriter();
+ try {
+ ioTaskQueue.put(chunkWriterImpl);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ chunkWriterImpl = createIChunkWriter();
+ encodeInfo.resetPointAndSize();
}
}
- if (pointNumInCurrentChunk != 0) {
+ if (encodeInfo.pointNumInChunk != 0) {
chunkWriterImpl.sealCurrentPage();
chunkWriterImpl.clearPageWriter();
try {
@@ -547,6 +497,7 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
+ encodeInfo.reset();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 548fa4d76b7..1cb913873fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -44,6 +45,9 @@ import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
import java.io.DataInputStream;
import java.io.IOException;
@@ -1566,6 +1570,8 @@ public abstract class AlignedTVList extends TVList {
private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ private long maxNumberOfPointsInChunk =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
public AlignedTVListIterator(
List<TSDataType> dataTypeList,
@@ -1591,6 +1597,10 @@ public abstract class AlignedTVList extends TVList {
for (int i = 0; i < dataTypeList.size(); i++) {
valueColumnDeleteCursor.add(new int[] {0});
}
+ int avgPointSizeOfLargestColumn = getAvgPointSizeOfLargestColumn();
+ long TARGET_CHUNK_SIZE =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ maxNumberOfPointsInChunk =
+ Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
}
@Override
@@ -1756,7 +1766,7 @@ public abstract class AlignedTVList extends TVList {
int startIndex = index;
// time column
for (; index < rows; index++) {
- if (validRowCount > MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (validRowCount >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
break;
}
// skip empty row
@@ -1773,10 +1783,10 @@ public abstract class AlignedTVList extends TVList {
|| (isTimeDeleted(nextRowIndex)))) {
nextRowIndex++;
}
- long timestamp = getTime(index);
- if ((nextRowIndex == rows || timestamp != getTime(nextRowIndex))
- && !isPointDeleted(timestamp, timeColumnDeletion, deleteCursor)) {
- timeBuilder.writeLong(getTime(index));
+ long time = getTime(index);
+ if ((nextRowIndex == rows || time != getTime(nextRowIndex))
+ && !isPointDeleted(time, timeColumnDeletion, deleteCursor)) {
+ timeBuilder.writeLong(time);
validRowCount++;
} else {
if (Objects.isNull(timeInvalidInfo)) {
@@ -1937,6 +1947,140 @@ public abstract class AlignedTVList extends TVList {
return builder.build();
}
+ @Override
+ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
+
+ // duplicated time or deleted time are all invalid, true if we don't
need this row
+ BitMap timeDuplicateInfo = null;
+
+ int startIndex = index;
+ // time column
+ for (; index < rows; index++) {
+ if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk
+ || encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ break;
+ }
+ // skip empty row
+ if (allValueColDeletedMap != null &&
allValueColDeletedMap.isMarked(getValueIndex(index))) {
+ continue;
+ }
+ if (isTimeDeleted(index)) {
+ continue;
+ }
+ int nextRowIndex = index + 1;
+ while (nextRowIndex < rows
+ && ((allValueColDeletedMap != null
+ &&
allValueColDeletedMap.isMarked(getValueIndex(nextRowIndex)))
+ || (isTimeDeleted(nextRowIndex)))) {
+ nextRowIndex++;
+ }
+ long time = getTime(index);
+ if (nextRowIndex == rows || time != getTime(nextRowIndex)) {
+ times[encodeInfo.pointNumInPage++] = time;
+ encodeInfo.pointNumInChunk++;
+ } else {
+ if (Objects.isNull(timeDuplicateInfo)) {
+ timeDuplicateInfo = new BitMap(rows);
+ }
+ timeDuplicateInfo.mark(index);
+ }
+ index = nextRowIndex - 1;
+ }
+
+ int columnCount = dataTypeList.size();
+ // value columns
+ for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
+ ValueChunkWriter valueChunkWriter =
+ alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
+ int validColumnIndex = columnIndexList.get(columnIndex);
+
+ // Pair of Time and Index
+ Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
+ if (Objects.nonNull(timeDuplicateInfo)) {
+ lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE,
null);
+ }
+ for (int sortedRowIndex = startIndex; sortedRowIndex < index;
sortedRowIndex++) {
+ // skip empty row
+ if ((allValueColDeletedMap != null
+ &&
allValueColDeletedMap.isMarked(getValueIndex(sortedRowIndex)))
+ || (isTimeDeleted(sortedRowIndex))) {
+ continue;
+ }
+ long time = getTime(sortedRowIndex);
+ // skip time duplicated or totally deleted rows
+ if (Objects.nonNull(timeDuplicateInfo)) {
+ if (!outer.isNullValue(getValueIndex(sortedRowIndex),
validColumnIndex)) {
+ lastValidPointIndexForTimeDupCheck.left =
getTime(sortedRowIndex);
+ lastValidPointIndexForTimeDupCheck.right =
getValueIndex(sortedRowIndex);
+ }
+ if (timeDuplicateInfo.isMarked(sortedRowIndex)) {
+ continue;
+ }
+ }
+
+ // The part of code solves the following problem:
+ // Time: 1,2,2,3
+ // Value: 1,2,null,null
+ // When rowIndex:1, pair(min,null), timeDuplicateInfo:false,
write(T:1,V:1)
+ // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing
value
+ // When rowIndex:3, pair(2,2), timeDuplicateInfo:false,
T:2==pair.left:2, write(T:2,V:2)
+ // When rowIndex:4, pair(2,2), timeDuplicateInfo:false,
T:3!=pair.left:2,
+ // write(T:3,V:null)
+ int originRowIndex;
+ if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
+ && (getTime(sortedRowIndex) ==
lastValidPointIndexForTimeDupCheck.left)) {
+ originRowIndex = lastValidPointIndexForTimeDupCheck.right;
+ } else {
+ originRowIndex = getValueIndex(sortedRowIndex);
+ }
+
+ boolean isNull = outer.isNullValue(originRowIndex, validColumnIndex);
+ switch (dataTypeList.get(columnIndex)) {
+ case BOOLEAN:
+ valueChunkWriter.write(
+ time,
+ !isNull && getBooleanByValueIndex(originRowIndex,
validColumnIndex),
+ isNull);
+ break;
+ case INT32:
+ case DATE:
+ valueChunkWriter.write(
+ time, isNull ? 0 : getIntByValueIndex(originRowIndex,
validColumnIndex), isNull);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ valueChunkWriter.write(
+ time, isNull ? 0 : getLongByValueIndex(originRowIndex,
validColumnIndex), isNull);
+ break;
+ case FLOAT:
+ valueChunkWriter.write(
+ time,
+ isNull ? 0 : getFloatByValueIndex(originRowIndex,
validColumnIndex),
+ isNull);
+ break;
+ case DOUBLE:
+ valueChunkWriter.write(
+ time,
+ isNull ? 0 : getDoubleByValueIndex(originRowIndex,
validColumnIndex),
+ isNull);
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ valueChunkWriter.write(
+ time,
+ isNull ? null : getBinaryByValueIndex(originRowIndex,
validColumnIndex),
+ isNull);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ probeNext = false;
+ }
+
public int[] getSelectedIndices() {
return selectedIndices;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
similarity index 57%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
index b6864a27cb4..b836524438c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
@@ -19,13 +19,28 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.reader.IPointReader;
+// BatchEncodeInfo struct
+public class BatchEncodeInfo {
+ public int pointNumInPage;
+ public int pointNumInChunk;
+ public long dataSizeInChunk;
+ public boolean lastIterator;
-public interface MemPointIterator extends IPointReader {
- TsBlock getBatch(int tsBlockIndex);
+ public BatchEncodeInfo(int pointNumInPage, int pointNumInChunk, long
dataSizeInChunk) {
+ this.pointNumInPage = pointNumInPage;
+ this.pointNumInChunk = pointNumInChunk;
+ this.dataSizeInChunk = dataSizeInChunk;
+ this.lastIterator = false;
+ }
- boolean hasNextBatch();
+ public void reset() {
+ resetPointAndSize();
+ this.lastIterator = false;
+ }
- TsBlock nextBatch();
+ public void resetPointAndSize() {
+ this.pointNumInPage = 0;
+ this.pointNumInChunk = 0;
+ this.dataSizeInChunk = 0;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
index b6864a27cb4..66fc547db20 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIterator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.write.chunk.IChunkWriter;
public interface MemPointIterator extends IPointReader {
TsBlock getBatch(int tsBlockIndex);
@@ -28,4 +29,6 @@ public interface MemPointIterator extends IPointReader {
boolean hasNextBatch();
TsBlock nextBatch();
+
+ void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo,
long[] times);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
index a4ae369b3a8..8ff8597d51d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
@@ -19,11 +19,18 @@
package org.apache.iotdb.db.utils.datastructure;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
import java.util.ArrayList;
import java.util.Iterator;
@@ -47,6 +54,9 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
new PriorityQueue<>(
(a, b) -> a.left.equals(b.left) ? b.right.compareTo(a.right) :
a.left.compareTo(b.left));
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private long maxNumberOfPointsInChunk = CONFIG.getTargetChunkPointNum();
+
public MergeSortMultiAlignedTVListIterator(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
@@ -75,6 +85,17 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
valueColumnDeleteCursor.add(new int[] {0});
}
this.ignoreAllNullRows = ignoreAllNullRows;
+
+ if (!alignedTvLists.isEmpty()) {
+ int avgPointSizeOfLargestColumn =
+ alignedTvLists.stream()
+ .mapToInt(AlignedTVList::getAvgPointSizeOfLargestColumn)
+ .max()
+ .getAsInt();
+ long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
+ maxNumberOfPointsInChunk =
+ Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
+ }
}
@Override
@@ -164,6 +185,90 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
probeNext = false;
}
+ @Override
+ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ AlignedChunkWriterImpl alignedChunkWriterImpl = (AlignedChunkWriterImpl)
chunkWriter;
+ while (hasNextTimeValuePair()) {
+ times[encodeInfo.pointNumInPage] = currentTime;
+ for (int columnIndex = 0; columnIndex < tsDataTypeList.size();
columnIndex++) {
+ ValueChunkWriter valueChunkWriter =
+ alignedChunkWriterImpl.getValueChunkWriterByIndex(columnIndex);
+ AlignedTVList alignedTVList =
+
alignedTvListIterators.get(currentIteratorIndex(columnIndex)).getAlignedTVList();
+
+ // sanity check
+ int validColumnIndex =
+ columnIndexList != null ? columnIndexList.get(columnIndex) :
columnIndex;
+ if (validColumnIndex < 0 || validColumnIndex >=
alignedTVList.dataTypes.size()) {
+ valueChunkWriter.write(currentTime, null, true);
+ continue;
+ }
+ int valueIndex =
alignedTVList.getValueIndex(currentRowIndex(columnIndex));
+
+ // null value
+ if (alignedTVList.isNullValue(valueIndex, validColumnIndex)) {
+ valueChunkWriter.write(currentTime, null, true);
+ continue;
+ }
+
+ switch (tsDataTypeList.get(columnIndex)) {
+ case BOOLEAN:
+ valueChunkWriter.write(
+ currentTime,
+ alignedTVList.getBooleanByValueIndex(valueIndex,
validColumnIndex),
+ false);
+ break;
+ case INT32:
+ case DATE:
+ valueChunkWriter.write(
+ currentTime, alignedTVList.getIntByValueIndex(valueIndex,
validColumnIndex), false);
+ break;
+ case INT64:
+ case TIMESTAMP:
+ valueChunkWriter.write(
+ currentTime,
+ alignedTVList.getLongByValueIndex(valueIndex,
validColumnIndex),
+ false);
+ break;
+ case FLOAT:
+ valueChunkWriter.write(
+ currentTime,
+ alignedTVList.getFloatByValueIndex(valueIndex,
validColumnIndex),
+ false);
+ break;
+ case DOUBLE:
+ valueChunkWriter.write(
+ currentTime,
+ alignedTVList.getDoubleByValueIndex(valueIndex,
validColumnIndex),
+ false);
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ valueChunkWriter.write(
+ currentTime,
+ alignedTVList.getBinaryByValueIndex(valueIndex,
validColumnIndex),
+ false);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.",
tsDataTypeList.get(columnIndex)));
+ }
+ }
+ next();
+ encodeInfo.pointNumInPage++;
+ encodeInfo.pointNumInChunk++;
+
+ // new page
+ if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE
+ || encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+ alignedChunkWriterImpl.write(times, encodeInfo.pointNumInPage, 0);
+ encodeInfo.pointNumInPage = 0;
+ break;
+ }
+ }
+ }
+
@Override
protected int currentIteratorIndex(int columnIndex) {
return iteratorIndices[columnIndex];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
index 1436d519cb5..2eb59c416ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
@@ -19,17 +19,30 @@
package org.apache.iotdb.db.utils.datastructure;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
import java.util.List;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
+
public class MergeSortMultiTVListIterator extends MultiTVListIterator {
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
+ private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
CONFIG.getTargetChunkPointNum();
+
private final List<Integer> probeIterators;
private final PriorityQueue<Pair<Long, Integer>> minHeap =
new PriorityQueue<>(
@@ -59,13 +72,15 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
if (!minHeap.isEmpty()) {
Pair<Long, Integer> top = minHeap.poll();
+ currentTime = top.left;
+ probeIterators.add(top.right);
+
iteratorIndex = top.right;
- probeIterators.add(iteratorIndex);
rowIndex = tvListIterators.get(iteratorIndex).getIndex();
hasNext = true;
// duplicated timestamps
- while (!minHeap.isEmpty() && minHeap.peek().left.longValue() ==
top.left.longValue()) {
+ while (!minHeap.isEmpty() && minHeap.peek().left == currentTime) {
Pair<Long, Integer> element = minHeap.poll();
probeIterators.add(element.right);
}
@@ -80,4 +95,62 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
}
probeNext = false;
}
+
+ @Override
+ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
+ while (hasNextTimeValuePair()) {
+ // remember current iterator and row index
+ TVList.TVListIterator currIterator = tvListIterators.get(iteratorIndex);
+ int row = rowIndex;
+ long time = currentTime;
+
+ // check if it is last point
+ next();
+ if (!hasNextTimeValuePair()) {
+ chunkWriterImpl.setLastPoint(true);
+ }
+
+ switch (tsDataType) {
+ case BOOLEAN:
+ chunkWriterImpl.write(time,
currIterator.getTVList().getBoolean(row));
+ encodeInfo.dataSizeInChunk += 8L + 1L;
+ break;
+ case INT32:
+ case DATE:
+ chunkWriterImpl.write(time, currIterator.getTVList().getInt(row));
+ encodeInfo.dataSizeInChunk += 8L + 4L;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ chunkWriterImpl.write(time, currIterator.getTVList().getLong(row));
+ encodeInfo.dataSizeInChunk += 8L + 8L;
+ break;
+ case FLOAT:
+ chunkWriterImpl.write(time, currIterator.getTVList().getFloat(row));
+ encodeInfo.dataSizeInChunk += 8L + 4L;
+ break;
+ case DOUBLE:
+ chunkWriterImpl.write(time, currIterator.getTVList().getDouble(row));
+ encodeInfo.dataSizeInChunk += 8L + 8L;
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ Binary value = currIterator.getTVList().getBinary(row);
+ chunkWriterImpl.write(time, value);
+ encodeInfo.dataSizeInChunk += 8L + getBinarySize(value);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", tsDataType));
+ }
+ encodeInfo.pointNumInChunk++;
+
+ if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
+ || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ break;
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
index cb203a5abb9..d400f629971 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
@@ -42,6 +42,7 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
protected boolean probeNext = false;
protected boolean hasNext = false;
+ protected long currentTime = 0;
protected int iteratorIndex = 0;
protected int rowIndex = 0;
@@ -79,9 +80,7 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
}
TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
TimeValuePair currentTvPair =
- iterator
- .getTVList()
- .getTimeValuePair(rowIndex, iterator.currentTime(),
floatPrecision, encoding);
+ iterator.getTVList().getTimeValuePair(rowIndex, currentTime,
floatPrecision, encoding);
next();
return currentTvPair;
}
@@ -92,9 +91,7 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
return null;
}
TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
- return iterator
- .getTVList()
- .getTimeValuePair(rowIndex, iterator.currentTime(), floatPrecision,
encoding);
+ return iterator.getTVList().getTimeValuePair(rowIndex, currentTime,
floatPrecision, encoding);
}
@Override
@@ -107,7 +104,7 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(tsDataType));
while (hasNextTimeValuePair() && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
- builder.getTimeColumnBuilder().writeLong(iterator.currentTime());
+ builder.getTimeColumnBuilder().writeLong(currentTime);
switch (tsDataType) {
case BOOLEAN:
builder.getColumnBuilder(0).writeBoolean(iterator.getTVList().getBoolean(rowIndex));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
index 75a94b4fcb4..dc7c35f6de1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
@@ -23,6 +23,7 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.chunk.IChunkWriter;
import java.util.ArrayList;
import java.util.List;
@@ -104,6 +105,20 @@ public class OrderedMultiAlignedTVListIterator extends
MultiAlignedTVListIterato
probeNext = false;
}
+ @Override
+ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ while (iteratorIndex < alignedTvListIterators.size()) {
+ TVList.TVListIterator iterator =
alignedTvListIterators.get(iteratorIndex);
+ if (!iterator.hasNextBatch()) {
+ iteratorIndex++;
+ continue;
+ }
+ iterator.encodeBatch(chunkWriter, encodeInfo, times);
+ break;
+ }
+ probeNext = false;
+ }
+
@Override
protected int currentIteratorIndex(int columnIndex) {
return iteratorIndex;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
index 48e58aceab1..e20ae061f75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.write.chunk.IChunkWriter;
import java.util.List;
@@ -38,13 +39,14 @@ public class OrderedMultiTVListIterator extends
MultiTVListIterator {
@Override
protected void prepareNext() {
hasNext = false;
- while (iteratorIndex < tvListIterators.size() - 1
- && !tvListIterators.get(iteratorIndex).hasNextTimeValuePair()) {
- iteratorIndex++;
- }
- TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
- if (iterator.hasNextTimeValuePair()) {
+ while (iteratorIndex < tvListIterators.size() && !hasNext) {
+ TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
+ if (!iterator.hasNextTimeValuePair()) {
+ iteratorIndex++;
+ continue;
+ }
rowIndex = iterator.getIndex();
+ currentTime = iterator.currentTime();
hasNext = true;
}
probeNext = true;
@@ -55,4 +57,21 @@ public class OrderedMultiTVListIterator extends
MultiTVListIterator {
tvListIterators.get(iteratorIndex).next();
probeNext = false;
}
+
+ @Override
+ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ while (iteratorIndex < tvListIterators.size()) {
+ TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
+ if (!iterator.hasNextBatch()) {
+ iteratorIndex++;
+ continue;
+ }
+ if (iteratorIndex == tvListIterators.size() - 1) {
+ encodeInfo.lastIterator = true;
+ }
+ iterator.encodeBatch(chunkWriter, encodeInfo, times);
+ break;
+ }
+ probeNext = false;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index b4392e7866a..5b0a5382a76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -36,6 +37,9 @@ import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.IChunkWriter;
import java.io.DataInputStream;
import java.io.IOException;
@@ -49,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import static
org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE;
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -660,6 +665,10 @@ public abstract class TVList implements WALEntryValue {
private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ private final long TARGET_CHUNK_SIZE =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
+ private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
+ IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
public TVListIterator(
List<TimeRange> deletionList, Integer floatPrecision, TSEncoding
encoding) {
@@ -733,10 +742,11 @@ public abstract class TVList implements WALEntryValue {
switch (dataType) {
case BOOLEAN:
while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ long time = getTime(index);
if (!isNullValue(getValueIndex(index))
- && !isPointDeleted(getTime(index), deletionList, deleteCursor)
- && (index == rows - 1 || getTime(index) != getTime(index +
1))) {
- builder.getTimeColumnBuilder().writeLong(getTime(index));
+ && !isPointDeleted(time, deletionList, deleteCursor)
+ && (index == rows - 1 || time != getTime(index + 1))) {
+ builder.getTimeColumnBuilder().writeLong(time);
builder.getColumnBuilder(0).writeBoolean(getBoolean(index));
builder.declarePosition();
}
@@ -746,10 +756,11 @@ public abstract class TVList implements WALEntryValue {
case INT32:
case DATE:
while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ long time = getTime(index);
if (!isNullValue(getValueIndex(index))
- && !isPointDeleted(getTime(index), deletionList, deleteCursor)
- && (index == rows - 1 || getTime(index) != getTime(index +
1))) {
- builder.getTimeColumnBuilder().writeLong(getTime(index));
+ && !isPointDeleted(time, deletionList, deleteCursor)
+ && (index == rows - 1 || time != getTime(index + 1))) {
+ builder.getTimeColumnBuilder().writeLong(time);
builder.getColumnBuilder(0).writeInt(getInt(index));
builder.declarePosition();
}
@@ -759,10 +770,11 @@ public abstract class TVList implements WALEntryValue {
case INT64:
case TIMESTAMP:
while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ long time = getTime(index);
if (!isNullValue(getValueIndex(index))
- && !isPointDeleted(getTime(index), deletionList, deleteCursor)
- && (index == rows - 1 || getTime(index) != getTime(index +
1))) {
- builder.getTimeColumnBuilder().writeLong(getTime(index));
+ && !isPointDeleted(time, deletionList, deleteCursor)
+ && (index == rows - 1 || time != getTime(index + 1))) {
+ builder.getTimeColumnBuilder().writeLong(time);
builder.getColumnBuilder(0).writeLong(getLong(index));
builder.declarePosition();
}
@@ -771,10 +783,11 @@ public abstract class TVList implements WALEntryValue {
break;
case FLOAT:
while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ long time = getTime(index);
if (!isNullValue(getValueIndex(index))
- && !isPointDeleted(getTime(index), deletionList, deleteCursor)
- && (index == rows - 1 || getTime(index) != getTime(index +
1))) {
- builder.getTimeColumnBuilder().writeLong(getTime(index));
+ && !isPointDeleted(time, deletionList, deleteCursor)
+ && (index == rows - 1 || time != getTime(index + 1))) {
+ builder.getTimeColumnBuilder().writeLong(time);
builder
.getColumnBuilder(0)
.writeFloat(
@@ -786,10 +799,11 @@ public abstract class TVList implements WALEntryValue {
break;
case DOUBLE:
while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ long time = getTime(index);
if (!isNullValue(getValueIndex(index))
- && !isPointDeleted(getTime(index), deletionList, deleteCursor)
- && (index == rows - 1 || getTime(index) != getTime(index +
1))) {
- builder.getTimeColumnBuilder().writeLong(getTime(index));
+ && !isPointDeleted(time, deletionList, deleteCursor)
+ && (index == rows - 1 || time != getTime(index + 1))) {
+ builder.getTimeColumnBuilder().writeLong(time);
builder
.getColumnBuilder(0)
.writeDouble(
@@ -803,22 +817,91 @@ public abstract class TVList implements WALEntryValue {
case BLOB:
case STRING:
while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ long time = getTime(index);
if (!isNullValue(getValueIndex(index))
- && !isPointDeleted(getTime(index), deletionList, deleteCursor)
- && (index == rows - 1 || getTime(index) != getTime(index +
1))) {
- builder.getTimeColumnBuilder().writeLong(getTime(index));
+ && !isPointDeleted(time, deletionList, deleteCursor)
+ && (index == rows - 1 || time != getTime(index + 1))) {
+ builder.getTimeColumnBuilder().writeLong(time);
builder.getColumnBuilder(0).writeBinary(getBinary(index));
builder.declarePosition();
}
index++;
}
break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
}
TsBlock tsBlock = builder.build();
tsBlocks.add(tsBlock);
return tsBlock;
}
+ @Override
+ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo
encodeInfo, long[] times) {
+ TSDataType dataType = getDataType();
+ ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
+ for (; index < rows; index++) {
+ if (isNullValue(getValueIndex(index))) {
+ continue;
+ }
+ long time = getTime(index);
+ while (index + 1 < rows && time == getTime(index + 1)) {
+ index++;
+ }
+ // store last point for SDT
+ if (encodeInfo.lastIterator) {
+ // skip deleted rows
+ while (index < rows && isNullValue(getValueIndex(index))) {
+ index++;
+ }
+ if (index == rows || index == rows - 1) {
+ chunkWriterImpl.setLastPoint(true);
+ }
+ }
+
+ switch (dataType) {
+ case BOOLEAN:
+ chunkWriterImpl.write(time, getBoolean(index));
+ encodeInfo.dataSizeInChunk += 8L + 1L;
+ break;
+ case INT32:
+ case DATE:
+ chunkWriterImpl.write(time, getInt(index));
+ encodeInfo.dataSizeInChunk += 8L + 4L;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ chunkWriterImpl.write(time, getLong(index));
+ encodeInfo.dataSizeInChunk += 8L + 8L;
+ break;
+ case FLOAT:
+ chunkWriterImpl.write(time, getFloat(index));
+ encodeInfo.dataSizeInChunk += 8L + 4L;
+ break;
+ case DOUBLE:
+ chunkWriterImpl.write(time, getDouble(index));
+ encodeInfo.dataSizeInChunk += 8L + 8L;
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ Binary value = getBinary(index);
+ chunkWriterImpl.write(time, value);
+ encodeInfo.dataSizeInChunk += 8L + getBinarySize(value);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ encodeInfo.pointNumInChunk++;
+ if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
+ || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ break;
+ }
+ }
+ }
+
@Override
public long getUsedMemorySize() {
return 0;