This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 040f3af358c refactor: load max page&chunk configuration in
ReadOnlyMemchunk and M… (#15309)
040f3af358c is described below
commit 040f3af358c6c77ec7c1d7c1ecac1194aa3632cd
Author: shizy <[email protected]>
AuthorDate: Fri Apr 18 09:38:22 2025 +0800
refactor: load max page&chunk configuration in ReadOnlyMemchunk and M…
(#15309)
* refactor: load max page&chunk configuration in ReadOnlyMemchunk and
MemTableFlushTask
* bug: fix testFlushMultiAlignedBinaryChunks
---
.../dataregion/flush/MemTableFlushTask.java | 12 +-
.../memtable/AlignedReadOnlyMemChunk.java | 6 +-
.../memtable/AlignedWritableMemChunk.java | 52 ++++--
.../dataregion/memtable/ReadOnlyMemChunk.java | 19 +-
.../dataregion/memtable/WritableMemChunk.java | 19 +-
.../db/utils/datastructure/AlignedTVList.java | 28 ++-
.../db/utils/datastructure/BatchEncodeInfo.java | 16 +-
.../datastructure/MemPointIteratorFactory.java | 194 ++++++++++++++-------
.../MergeSortMultiAlignedTVListIterator.java | 27 +--
.../MergeSortMultiTVListIterator.java | 15 +-
.../datastructure/MultiAlignedTVListIterator.java | 14 +-
.../utils/datastructure/MultiTVListIterator.java | 13 +-
.../OrderedMultiAlignedTVListIterator.java | 6 +-
.../datastructure/OrderedMultiTVListIterator.java | 5 +-
.../iotdb/db/utils/datastructure/TVList.java | 39 +++--
.../reader/chunk/MemAlignedChunkLoaderTest.java | 5 +-
.../read/reader/chunk/MemChunkLoaderTest.java | 19 +-
17 files changed, 295 insertions(+), 194 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 17fce5dd197..6977167f0e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -107,7 +107,17 @@ public class MemTableFlushTask {
this.dataRegionId = dataRegionId;
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
- this.encodeInfo = new BatchEncodeInfo(0, 0, 0);
+
+ long MAX_NUMBER_OF_POINTS_IN_CHUNK = config.getTargetChunkPointNum();
+ long TARGET_CHUNK_SIZE = config.getTargetChunkSize();
+ this.encodeInfo =
+ new BatchEncodeInfo(
+ 0,
+ 0,
+ 0,
+ MAX_NUMBER_OF_POINTS_IN_PAGE,
+ MAX_NUMBER_OF_POINTS_IN_CHUNK,
+ TARGET_CHUNK_SIZE);
LOGGER.debug(
"flush task of database {} memtable is created, flushing to file {}.",
storageGroup,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index 72caba755ea..5ad706df121 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -147,7 +147,8 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- context.isIgnoreAllNullRows());
+ context.isIgnoreAllNullRows(),
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextBatch()) {
// create pageTimeStatistics and pageValueStatistics for new page
@@ -327,7 +328,8 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- context.isIgnoreAllNullRows());
+ context.isIgnoreAllNullRows(),
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 013b2d6109c..d20866d0c79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.utils.datastructure.MemPointIterator;
import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Binary;
@@ -68,11 +67,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
private final boolean ignoreAllNullRows;
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- private long maxNumberOfPointsInChunk = CONFIG.getTargetChunkPointNum();
private final int TVLIST_SORT_THRESHOLD = CONFIG.getTvListSortThreshold();
- private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
@@ -439,12 +434,12 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
@SuppressWarnings({"squid:S6541", "squid:S3776"})
- public void encodeWorkingAlignedTVList(BlockingQueue<Object> ioTaskQueue) {
+ public void encodeWorkingAlignedTVList(
+ BlockingQueue<Object> ioTaskQueue,
+ long maxNumberOfPointsInChunk,
+ int maxNumberOfPointsInPage) {
BitMap allValueColDeletedMap;
allValueColDeletedMap = ignoreAllNullRows ?
list.getAllValueColDeletedMap() : null;
- int avgPointSizeOfLargestColumn = list.getAvgPointSizeOfLargestColumn();
- maxNumberOfPointsInChunk =
- Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
boolean[] timeDuplicateInfo = null;
@@ -463,7 +458,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
pointNumInPage++;
pointNumInChunk++;
- if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (pointNumInPage == maxNumberOfPointsInPage) {
pageRange.add(sortedRowIndex);
pointNumInPage = 0;
}
@@ -500,14 +495,16 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
chunkRange.add(pageRange);
}
- handleEncoding(ioTaskQueue, chunkRange, timeDuplicateInfo,
allValueColDeletedMap);
+ handleEncoding(
+ ioTaskQueue, chunkRange, timeDuplicateInfo, allValueColDeletedMap,
maxNumberOfPointsInPage);
}
private void handleEncoding(
BlockingQueue<Object> ioTaskQueue,
List<List<Integer>> chunkRange,
boolean[] timeDuplicateInfo,
- BitMap allValueColDeletedMap) {
+ BitMap allValueColDeletedMap,
+ int maxNumberOfPointsInPage) {
List<TSDataType> dataTypes = list.getTsDataTypes();
Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new
Pair[dataTypes.size()];
for (List<Integer> pageRange : chunkRange) {
@@ -607,7 +604,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
alignedChunkWriter.nextColumn();
}
- long[] times = new long[Math.min(MAX_NUMBER_OF_POINTS_IN_PAGE,
list.rowCount())];
+ long[] times = new long[Math.min(maxNumberOfPointsInPage,
list.rowCount())];
int pointsInPage = 0;
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
@@ -637,8 +634,14 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
@Override
public synchronized void encode(
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
+ encodeInfo.maxNumberOfPointsInChunk =
+ Math.min(
+ encodeInfo.maxNumberOfPointsInChunk,
+ (encodeInfo.targetChunkSize / getAvgPointSizeOfLargestColumn()));
+
if (TVLIST_SORT_THRESHOLD == 0) {
- encodeWorkingAlignedTVList(ioTaskQueue);
+ encodeWorkingAlignedTVList(
+ ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk,
encodeInfo.maxNumberOfPointsInPage);
return;
}
@@ -650,16 +653,20 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
List<Integer> columnIndexList = buildColumnIndexList(schemaList);
MemPointIterator timeValuePairIterator =
MemPointIteratorFactory.create(
- dataTypes, columnIndexList, alignedTvLists, ignoreAllNullRows);
+ dataTypes,
+ columnIndexList,
+ alignedTvLists,
+ ignoreAllNullRows,
+ encodeInfo.maxNumberOfPointsInPage);
while (timeValuePairIterator.hasNextBatch()) {
timeValuePairIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
- if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
encodeInfo.pointNumInPage = 0;
}
- if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
alignedChunkWriter.sealCurrentPage();
alignedChunkWriter.clearPageWriter();
try {
@@ -833,4 +840,15 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
return columnIndexList;
}
+
+ // Choose maximum avgPointSizeOfLargestColumn among working and sorted
AlignedTVList as
+ // approximate calculation
+ public int getAvgPointSizeOfLargestColumn() {
+ int avgPointSizeOfLargestColumn = list.getAvgPointSizeOfLargestColumn();
+ for (AlignedTVList alignedTVList : sortedList) {
+ avgPointSizeOfLargestColumn =
+ Math.max(avgPointSizeOfLargestColumn,
alignedTVList.getAvgPointSizeOfLargestColumn());
+ }
+ return avgPointSizeOfLargestColumn;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 0b4b5d98ec8..8fa93f3dc41 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -138,7 +138,13 @@ public class ReadOnlyMemChunk {
Statistics<? extends Serializable> chunkStatistics =
Statistics.getStatsByType(dataType);
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
timeValuePairIterator =
- MemPointIteratorFactory.create(dataType, tvLists, deletionList,
floatPrecision, encoding);
+ MemPointIteratorFactory.create(
+ dataType,
+ tvLists,
+ deletionList,
+ floatPrecision,
+ encoding,
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextBatch()) {
// statistics for current batch
Statistics<? extends Serializable> pageStatistics =
Statistics.getStatsByType(dataType);
@@ -248,7 +254,12 @@ public class ReadOnlyMemChunk {
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
MemPointIterator timeValuePairIterator =
MemPointIteratorFactory.create(
- getDataType(), tvLists, deletionList, floatPrecision, encoding);
+ getDataType(),
+ tvLists,
+ deletionList,
+ floatPrecision,
+ encoding,
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
@@ -323,8 +334,4 @@ public class ReadOnlyMemChunk {
public MemPointIterator getMemPointIterator() {
return timeValuePairIterator;
}
-
- public int getMaxNumberOfPointsInPage() {
- return MAX_NUMBER_OF_POINTS_IN_PAGE;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 86bbbde0bdd..1f4078dfc7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -63,8 +63,6 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
private static final Logger LOGGER =
LoggerFactory.getLogger(WritableMemChunk.class);
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
CONFIG.getTargetChunkPointNum();
private final int TVLIST_SORT_THRESHOLD = CONFIG.getTvListSortThreshold();
public WritableMemChunk(IMeasurementSchema schema) {
@@ -372,7 +370,8 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
return out.toString();
}
- public void encodeWorkingTVList(BlockingQueue<Object> ioTaskQueue) {
+ public void encodeWorkingTVList(
+ BlockingQueue<Object> ioTaskQueue, long maxNumberOfPointsInChunk, long
targetChunkSize) {
TSDataType tsDataType = schema.getType();
ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
@@ -430,8 +429,8 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
break;
}
pointNumInCurrentChunk++;
- if (pointNumInCurrentChunk > MAX_NUMBER_OF_POINTS_IN_CHUNK
- || dataSizeInCurrentChunk > TARGET_CHUNK_SIZE) {
+ if (pointNumInCurrentChunk > maxNumberOfPointsInChunk
+ || dataSizeInCurrentChunk > targetChunkSize) {
chunkWriterImpl.sealCurrentPage();
chunkWriterImpl.clearPageWriter();
try {
@@ -459,7 +458,8 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
public synchronized void encode(
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
if (TVLIST_SORT_THRESHOLD == 0) {
- encodeWorkingTVList(ioTaskQueue);
+ encodeWorkingTVList(
+ ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk,
encodeInfo.targetChunkSize);
return;
}
@@ -472,12 +472,13 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
List<TVList> tvLists = new ArrayList<>(sortedList);
tvLists.add(list);
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(schema.getType(), tvLists);
+ MemPointIteratorFactory.create(
+ schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage);
while (timeValuePairIterator.hasNextBatch()) {
timeValuePairIterator.encodeBatch(chunkWriterImpl, encodeInfo, times);
- if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
- || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.dataSizeInChunk >= encodeInfo.targetChunkSize) {
chunkWriterImpl.sealCurrentPage();
chunkWriterImpl.clearPageWriter();
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 1cb913873fb..d3a27e0b804 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -30,7 +29,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -1539,7 +1537,8 @@ public abstract class AlignedTVList extends TVList {
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return new AlignedTVListIterator(
dataTypeList,
columnIndexList,
@@ -1547,7 +1546,8 @@ public abstract class AlignedTVList extends TVList {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
/* AlignedTVList Iterator */
@@ -1568,11 +1568,6 @@ public abstract class AlignedTVList extends TVList {
private final int[] timeDeleteCursor = {0};
private final List<int[]> valueColumnDeleteCursor = new ArrayList<>();
- private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
-
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
- private long maxNumberOfPointsInChunk =
- IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
-
public AlignedTVListIterator(
List<TSDataType> dataTypeList,
List<Integer> columnIndexList,
@@ -1580,8 +1575,9 @@ public abstract class AlignedTVList extends TVList {
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
- super(null, null, null);
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
+ super(null, null, null, maxNumberOfPointsInPage);
this.dataTypeList = dataTypeList;
this.columnIndexList =
(columnIndexList == null)
@@ -1597,10 +1593,6 @@ public abstract class AlignedTVList extends TVList {
for (int i = 0; i < dataTypeList.size(); i++) {
valueColumnDeleteCursor.add(new int[] {0});
}
- int avgPointSizeOfLargestColumn = getAvgPointSizeOfLargestColumn();
- long TARGET_CHUNK_SIZE =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
- maxNumberOfPointsInChunk =
- Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
}
@Override
@@ -1766,7 +1758,7 @@ public abstract class AlignedTVList extends TVList {
int startIndex = index;
// time column
for (; index < rows; index++) {
- if (validRowCount >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (validRowCount >= maxNumberOfPointsInPage) {
break;
}
// skip empty row
@@ -1957,8 +1949,8 @@ public abstract class AlignedTVList extends TVList {
int startIndex = index;
// time column
for (; index < rows; index++) {
- if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk
- || encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.pointNumInPage >=
encodeInfo.maxNumberOfPointsInPage) {
break;
}
// skip empty row
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
index b836524438c..d4b3d7e9ccc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
@@ -21,15 +21,29 @@ package org.apache.iotdb.db.utils.datastructure;
// BatchEncodeInfo struct
public class BatchEncodeInfo {
+ // used by encode/encodeBatch during flush
+ public int maxNumberOfPointsInPage;
+ public long maxNumberOfPointsInChunk;
+ public long targetChunkSize;
+
public int pointNumInPage;
public int pointNumInChunk;
public long dataSizeInChunk;
public boolean lastIterator;
- public BatchEncodeInfo(int pointNumInPage, int pointNumInChunk, long
dataSizeInChunk) {
+ public BatchEncodeInfo(
+ int pointNumInPage,
+ int pointNumInChunk,
+ long dataSizeInChunk,
+ int maxNumberOfPointsInPage,
+ long maxNumberOfPointsInChunk,
+ long targetChunkSize) {
this.pointNumInPage = pointNumInPage;
this.pointNumInChunk = pointNumInChunk;
this.dataSizeInChunk = dataSizeInChunk;
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
+ this.maxNumberOfPointsInChunk = maxNumberOfPointsInChunk;
+ this.targetChunkSize = targetChunkSize;
this.lastIterator = false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
index 2359bbb2cd8..422a557d171 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
@@ -31,30 +31,38 @@ public class MemPointIteratorFactory {
}
// TVListIterator
- private static MemPointIterator single(List<TVList> tvLists) {
- return tvLists.get(0).iterator(null, null, null);
+ private static MemPointIterator single(List<TVList> tvLists, int
maxNumberOfPointsInPage) {
+ return tvLists.get(0).iterator(null, null, null, maxNumberOfPointsInPage);
}
- private static MemPointIterator single(List<TVList> tvLists, List<TimeRange>
deletionList) {
- return tvLists.get(0).iterator(deletionList, null, null);
+ private static MemPointIterator single(
+ List<TVList> tvLists, List<TimeRange> deletionList, int
maxNumberOfPointsInPage) {
+ return tvLists.get(0).iterator(deletionList, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator single(
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
- return tvLists.get(0).iterator(deletionList, floatPrecision, encoding);
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ return tvLists.get(0).iterator(deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
// MergeSortMultiTVListIterator
- private static MemPointIterator mergeSort(TSDataType tsDataType,
List<TVList> tvLists) {
- return new MergeSortMultiTVListIterator(tsDataType, tvLists, null, null,
null);
+ private static MemPointIterator mergeSort(
+ TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
+ return new MergeSortMultiTVListIterator(
+ tsDataType, tvLists, null, null, null, maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
- TSDataType tsDataType, List<TVList> tvLists, List<TimeRange>
deletionList) {
- return new MergeSortMultiTVListIterator(tsDataType, tvLists, deletionList,
null, null);
+ TSDataType tsDataType,
+ List<TVList> tvLists,
+ List<TimeRange> deletionList,
+ int maxNumberOfPointsInPage) {
+ return new MergeSortMultiTVListIterator(
+ tsDataType, tvLists, deletionList, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
@@ -62,19 +70,26 @@ public class MemPointIteratorFactory {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiTVListIterator(
- tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
// OrderedMultiTVListIterator
- private static MemPointIterator ordered(TSDataType tsDataType, List<TVList>
tvLists) {
- return new OrderedMultiTVListIterator(tsDataType, tvLists, null, null,
null);
+ private static MemPointIterator ordered(
+ TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
+ return new OrderedMultiTVListIterator(
+ tsDataType, tvLists, null, null, null, maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
- TSDataType tsDataType, List<TVList> tvLists, List<TimeRange>
deletionList) {
- return new OrderedMultiTVListIterator(tsDataType, tvLists, deletionList,
null, null);
+ TSDataType tsDataType,
+ List<TVList> tvLists,
+ List<TimeRange> deletionList,
+ int maxNumberOfPointsInPage) {
+ return new OrderedMultiTVListIterator(
+ tsDataType, tvLists, deletionList, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
@@ -82,9 +97,10 @@ public class MemPointIteratorFactory {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiTVListIterator(
- tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
// AlignedTVListIterator
@@ -92,10 +108,19 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return alignedTvLists
.get(0)
- .iterator(tsDataTypes, columnIndexList, null, null, null, null,
ignoreAllNullRows);
+ .iterator(
+ tsDataTypes,
+ columnIndexList,
+ null,
+ null,
+ null,
+ null,
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator single(
@@ -104,7 +129,8 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<TimeRange> timeColumnDeletion,
List<List<TimeRange>> valueColumnsDeletionList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return alignedTvLists
.get(0)
.iterator(
@@ -114,7 +140,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
null,
null,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator single(
@@ -125,7 +152,8 @@ public class MemPointIteratorFactory {
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return alignedTvLists
.get(0)
.iterator(
@@ -135,7 +163,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
// MergeSortMultiAlignedTVListIterator
@@ -143,9 +172,18 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiAlignedTVListIterator(
- tsDataTypes, columnIndexList, alignedTvLists, null, null, null, null,
ignoreAllNullRows);
+ tsDataTypes,
+ columnIndexList,
+ alignedTvLists,
+ null,
+ null,
+ null,
+ null,
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
@@ -154,7 +192,8 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<TimeRange> timeColumnDeletion,
List<List<TimeRange>> valueColumnsDeletionList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiAlignedTVListIterator(
tsDataTypes,
columnIndexList,
@@ -163,7 +202,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
null,
null,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
@@ -174,7 +214,8 @@ public class MemPointIteratorFactory {
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiAlignedTVListIterator(
tsDataTypes,
columnIndexList,
@@ -183,7 +224,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
// OrderedMultiAlignedTVListIterator
@@ -191,9 +233,18 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiAlignedTVListIterator(
- tsDataTypes, columnIndexList, alignedTvLists, null, null, null, null,
ignoreAllNullRows);
+ tsDataTypes,
+ columnIndexList,
+ alignedTvLists,
+ null,
+ null,
+ null,
+ null,
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
@@ -202,7 +253,8 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<TimeRange> timeColumnDeletion,
List<List<TimeRange>> valueColumnsDeletionList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiAlignedTVListIterator(
tsDataTypes,
columnIndexList,
@@ -211,7 +263,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
null,
null,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
@@ -222,7 +275,8 @@ public class MemPointIteratorFactory {
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiAlignedTVListIterator(
tsDataTypes,
columnIndexList,
@@ -231,27 +285,32 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
- public static MemPointIterator create(TSDataType tsDataType, List<TVList>
tvLists) {
+ public static MemPointIterator create(
+ TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
- return single(tvLists);
+ return single(tvLists, maxNumberOfPointsInPage);
} else if (isCompleteOrdered(tvLists)) {
- return ordered(tsDataType, tvLists);
+ return ordered(tsDataType, tvLists, maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataType, tvLists);
+ return mergeSort(tsDataType, tvLists, maxNumberOfPointsInPage);
}
}
public static MemPointIterator create(
- TSDataType tsDataType, List<TVList> tvLists, List<TimeRange>
deletionList) {
+ TSDataType tsDataType,
+ List<TVList> tvLists,
+ List<TimeRange> deletionList,
+ int maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
- return single(tvLists, deletionList);
+ return single(tvLists, deletionList, maxNumberOfPointsInPage);
} else if (isCompleteOrdered(tvLists)) {
- return ordered(tsDataType, tvLists, deletionList);
+ return ordered(tsDataType, tvLists, deletionList,
maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataType, tvLists, deletionList);
+ return mergeSort(tsDataType, tvLists, deletionList,
maxNumberOfPointsInPage);
}
}
@@ -260,13 +319,16 @@ public class MemPointIteratorFactory {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
- return single(tvLists, deletionList, floatPrecision, encoding);
+ return single(tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
} else if (isCompleteOrdered(tvLists)) {
- return ordered(tsDataType, tvLists, deletionList, floatPrecision,
encoding);
+ return ordered(
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataType, tvLists, deletionList, floatPrecision,
encoding);
+ return mergeSort(
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
}
@@ -274,13 +336,17 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
if (alignedTvLists.size() == 1) {
- return single(tsDataTypes, columnIndexList, alignedTvLists,
ignoreAllNullRows);
+ return single(
+ tsDataTypes, columnIndexList, alignedTvLists, ignoreAllNullRows,
maxNumberOfPointsInPage);
} else if (isCompleteOrdered(alignedTvLists)) {
- return ordered(tsDataTypes, columnIndexList, alignedTvLists,
ignoreAllNullRows);
+ return ordered(
+ tsDataTypes, columnIndexList, alignedTvLists, ignoreAllNullRows,
maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataTypes, columnIndexList, alignedTvLists,
ignoreAllNullRows);
+ return mergeSort(
+ tsDataTypes, columnIndexList, alignedTvLists, ignoreAllNullRows,
maxNumberOfPointsInPage);
}
}
@@ -290,7 +356,8 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<TimeRange> timeColumnDeletion,
List<List<TimeRange>> valueColumnsDeletionList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
if (alignedTvLists.size() == 1) {
return single(
tsDataTypes,
@@ -298,7 +365,8 @@ public class MemPointIteratorFactory {
alignedTvLists,
timeColumnDeletion,
valueColumnsDeletionList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
} else if (isCompleteOrdered(alignedTvLists)) {
return ordered(
tsDataTypes,
@@ -306,7 +374,8 @@ public class MemPointIteratorFactory {
alignedTvLists,
timeColumnDeletion,
valueColumnsDeletionList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
} else {
return mergeSort(
tsDataTypes,
@@ -314,7 +383,8 @@ public class MemPointIteratorFactory {
alignedTvLists,
timeColumnDeletion,
valueColumnsDeletionList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
}
@@ -326,7 +396,8 @@ public class MemPointIteratorFactory {
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
if (alignedTvLists.size() == 1) {
return single(
tsDataTypes,
@@ -336,7 +407,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
} else if (isCompleteOrdered(alignedTvLists)) {
return ordered(
tsDataTypes,
@@ -346,7 +418,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
} else {
return mergeSort(
tsDataTypes,
@@ -356,7 +429,8 @@ public class MemPointIteratorFactory {
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
index 8ff8597d51d..24654e05012 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
@@ -54,9 +51,6 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
new PriorityQueue<>(
(a, b) -> a.left.equals(b.left) ? b.right.compareTo(a.right) :
a.left.compareTo(b.left));
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private long maxNumberOfPointsInChunk = CONFIG.getTargetChunkPointNum();
-
public MergeSortMultiAlignedTVListIterator(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
@@ -65,7 +59,8 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
super(
tsDataTypes,
columnIndexList,
@@ -74,7 +69,8 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
this.probeIterators =
IntStream.range(0,
alignedTvListIterators.size()).boxed().collect(Collectors.toSet());
this.bitMap = new BitMap(tsDataTypeList.size());
@@ -85,17 +81,6 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
valueColumnDeleteCursor.add(new int[] {0});
}
this.ignoreAllNullRows = ignoreAllNullRows;
-
- if (!alignedTvLists.isEmpty()) {
- int avgPointSizeOfLargestColumn =
- alignedTvLists.stream()
- .mapToInt(AlignedTVList::getAvgPointSizeOfLargestColumn)
- .max()
- .getAsInt();
- long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- maxNumberOfPointsInChunk =
- Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
- }
}
@Override
@@ -260,8 +245,8 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
encodeInfo.pointNumInChunk++;
// new page
- if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE
- || encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+ if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage
+ || encodeInfo.pointNumInChunk >=
encodeInfo.maxNumberOfPointsInChunk) {
alignedChunkWriterImpl.write(times, encodeInfo.pointNumInPage, 0);
encodeInfo.pointNumInPage = 0;
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
index 2eb59c416ad..616835082bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
@@ -39,9 +36,6 @@ import java.util.stream.IntStream;
import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
public class MergeSortMultiTVListIterator extends MultiTVListIterator {
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
CONFIG.getTargetChunkPointNum();
private final List<Integer> probeIterators;
private final PriorityQueue<Pair<Long, Integer>> minHeap =
@@ -53,8 +47,9 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
- super(tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ super(tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
this.probeIterators =
IntStream.range(0,
tvListIterators.size()).boxed().collect(Collectors.toList());
}
@@ -147,8 +142,8 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
}
encodeInfo.pointNumInChunk++;
- if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
- || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.dataSizeInChunk >= encodeInfo.targetChunkSize) {
break;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
index 1201f54a509..5c4b730af80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.utils.datastructure;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -50,8 +49,8 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
protected List<TsBlock> tsBlocks;
protected long currentTime;
- protected final int MAX_NUMBER_OF_POINTS_IN_PAGE =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ // used by nextBatch during query
+ protected final int maxNumberOfPointsInPage;
protected MultiAlignedTVListIterator(
List<TSDataType> tsDataTypeList,
@@ -61,7 +60,8 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
this.tsDataTypeList = tsDataTypeList;
this.columnIndexList = columnIndexList;
this.alignedTvListIterators = new ArrayList<>(alignedTvLists.size());
@@ -74,13 +74,15 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
null,
floatPrecision,
encodingList,
- ignoreAllNullRows));
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage));
}
this.valueColumnsDeletionList = valueColumnsDeletionList;
this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
this.encodingList = encodingList;
this.ignoreAllNullRows = ignoreAllNullRows;
this.tsBlocks = new ArrayList<>();
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
}
@Override
@@ -135,7 +137,7 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
// Time column
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
- while (hasNextTimeValuePair() && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (hasNextTimeValuePair() && builder.getPositionCount() <
maxNumberOfPointsInPage) {
timeBuilder.writeLong(currentTime);
for (int columnIndex = 0; columnIndex < tsDataTypeList.size();
columnIndex++) {
// Value column
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
index d400f629971..aa9ec257311 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -46,23 +45,25 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
protected int iteratorIndex = 0;
protected int rowIndex = 0;
- protected final int MAX_NUMBER_OF_POINTS_IN_PAGE =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ // used by nextBatch during query
+ protected final int maxNumberOfPointsInPage;
protected MultiTVListIterator(
TSDataType tsDataType,
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
this.tsDataType = tsDataType;
this.tvListIterators = new ArrayList<>(tvLists.size());
for (TVList tvList : tvLists) {
- tvListIterators.add(tvList.iterator(deletionList, null, null));
+ tvListIterators.add(tvList.iterator(deletionList, null, null,
maxNumberOfPointsInPage));
}
this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
this.encoding = encoding;
this.tsBlocks = new ArrayList<>();
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
}
@Override
@@ -102,7 +103,7 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
@Override
public TsBlock nextBatch() {
TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(tsDataType));
- while (hasNextTimeValuePair() && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (hasNextTimeValuePair() && builder.getPositionCount() <
maxNumberOfPointsInPage) {
TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
builder.getTimeColumnBuilder().writeLong(currentTime);
switch (tsDataType) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
index dc7c35f6de1..11ae7349ed2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
@@ -44,7 +44,8 @@ public class OrderedMultiAlignedTVListIterator extends
MultiAlignedTVListIterato
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
List<TSEncoding> encodingList,
- boolean ignoreAllNullRows) {
+ boolean ignoreAllNullRows,
+ int maxNumberOfPointsInPage) {
super(
tsDataTypes,
columnIndexList,
@@ -53,7 +54,8 @@ public class OrderedMultiAlignedTVListIterator extends
MultiAlignedTVListIterato
valueColumnsDeletionList,
floatPrecision,
encodingList,
- ignoreAllNullRows);
+ ignoreAllNullRows,
+ maxNumberOfPointsInPage);
this.bitMap = new BitMap(tsDataTypeList.size());
this.valueColumnDeleteCursor = new ArrayList<>();
for (int i = 0; i < tsDataTypeList.size(); i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
index e20ae061f75..eeb69ddb95b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
@@ -32,8 +32,9 @@ public class OrderedMultiTVListIterator extends
MultiTVListIterator {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
- super(tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ super(tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 5b0a5382a76..d15fb28fbd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -21,13 +21,11 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.MathUtils;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -647,8 +645,11 @@ public abstract class TVList implements WALEntryValue {
}
public TVListIterator iterator(
- List<TimeRange> deletionList, Integer floatPrecision, TSEncoding
encoding) {
- return new TVListIterator(deletionList, floatPrecision, encoding);
+ List<TimeRange> deletionList,
+ Integer floatPrecision,
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ return new TVListIterator(deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
/* TVList Iterator */
@@ -663,15 +664,14 @@ public abstract class TVList implements WALEntryValue {
private final int floatPrecision;
private final TSEncoding encoding;
- private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
-
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
- private final long TARGET_CHUNK_SIZE =
- IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
- private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
- IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ // used by nextBatch during query
+ protected final int maxNumberOfPointsInPage;
public TVListIterator(
- List<TimeRange> deletionList, Integer floatPrecision, TSEncoding
encoding) {
+ List<TimeRange> deletionList,
+ Integer floatPrecision,
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
this.deletionList = deletionList;
this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
this.encoding = encoding;
@@ -679,6 +679,7 @@ public abstract class TVList implements WALEntryValue {
this.rows = rowCount;
this.probeNext = false;
this.tsBlocks = new ArrayList<>();
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
}
protected void prepareNext() {
@@ -741,7 +742,7 @@ public abstract class TVList implements WALEntryValue {
TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(dataType));
switch (dataType) {
case BOOLEAN:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -755,7 +756,7 @@ public abstract class TVList implements WALEntryValue {
break;
case INT32:
case DATE:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -769,7 +770,7 @@ public abstract class TVList implements WALEntryValue {
break;
case INT64:
case TIMESTAMP:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -782,7 +783,7 @@ public abstract class TVList implements WALEntryValue {
}
break;
case FLOAT:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -798,7 +799,7 @@ public abstract class TVList implements WALEntryValue {
}
break;
case DOUBLE:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -816,7 +817,7 @@ public abstract class TVList implements WALEntryValue {
case TEXT:
case BLOB:
case STRING:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -895,8 +896,8 @@ public abstract class TVList implements WALEntryValue {
String.format("Data type %s is not supported.", dataType));
}
encodeInfo.pointNumInChunk++;
- if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
- || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.dataSizeInChunk >= encodeInfo.targetChunkSize) {
break;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
index 7ffe075c80b..c106476c889 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
@@ -57,6 +57,7 @@ import static org.junit.Assert.fail;
public class MemAlignedChunkLoaderTest {
private static final String BINARY_STR = "ty love zm";
+ private static final int maxNumberOfPointsInPage = 1000;
@Test
public void testMemAlignedChunkLoader() throws IOException {
@@ -78,7 +79,6 @@ public class MemAlignedChunkLoaderTest {
Mockito.when(timeStatistics.getCount()).thenReturn(2);
timeStatitsticsList.add(timeStatistics);
Mockito.when(chunk.getTimeStatisticsList()).thenReturn(timeStatitsticsList);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
List<Statistics<? extends Serializable>[]> valuesStatitsticsList = new
ArrayList<>();
Statistics<? extends Serializable>[] valuesStatistics = new Statistics[6];
@@ -115,7 +115,8 @@ public class MemAlignedChunkLoaderTest {
List<AlignedTVList> alignedTvLists =
alignedTvListMap.keySet().stream().map(x -> (AlignedTVList)
x).collect(Collectors.toList());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(dataTypes, null, alignedTvLists, false);
+ MemPointIteratorFactory.create(
+ dataTypes, null, alignedTvLists, false, maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
index 69badb686db..6eddd40e80e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
@@ -59,17 +59,17 @@ import static org.junit.Assert.fail;
public class MemChunkLoaderTest {
private static final String BINARY_STR = "ty love zm";
+ private static final int maxNumberOfPointsInPage = 1000;
@Test
public void testBooleanMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.BOOLEAN);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> booleanTvListMap = buildBooleanTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(booleanTvListMap);
List<TVList> booleanTvLists = new ArrayList<>(booleanTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.BOOLEAN, booleanTvLists);
+ MemPointIteratorFactory.create(TSDataType.BOOLEAN, booleanTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -144,12 +144,11 @@ public class MemChunkLoaderTest {
public void testInt32MemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.INT32);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> int32TvListMap = buildInt32TvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(int32TvListMap);
List<TVList> int32TvLists = new ArrayList<>(int32TvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.INT32, int32TvLists);
+ MemPointIteratorFactory.create(TSDataType.INT32, int32TvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -224,12 +223,11 @@ public class MemChunkLoaderTest {
public void testInt64MemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.INT64);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> int64TvListMap = buildInt64TvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(int64TvListMap);
List<TVList> int64TvLists = new ArrayList<>(int64TvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.INT64, int64TvLists);
+ MemPointIteratorFactory.create(TSDataType.INT64, int64TvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -304,12 +302,11 @@ public class MemChunkLoaderTest {
public void testFloatMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.FLOAT);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> floatTvListMap = buildFloatTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(floatTvListMap);
List<TVList> floatTvLists = new ArrayList<>(floatTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.FLOAT, floatTvLists);
+ MemPointIteratorFactory.create(TSDataType.FLOAT, floatTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -384,12 +381,11 @@ public class MemChunkLoaderTest {
public void testDoubleMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.DOUBLE);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> doubleTvListMap = buildDoubleTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(doubleTvListMap);
List<TVList> doubleTvLists = new ArrayList<>(doubleTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.DOUBLE, doubleTvLists);
+ MemPointIteratorFactory.create(TSDataType.DOUBLE, doubleTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -464,12 +460,11 @@ public class MemChunkLoaderTest {
public void testTextMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.TEXT);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> textTvListMap = buildTextTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(textTvListMap);
List<TVList> textTvLists = new ArrayList<>(textTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.TEXT, textTvLists);
+ MemPointIteratorFactory.create(TSDataType.TEXT, textTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);