This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/scanOpBatchProcess1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fee78afd5b3b98a40852bd7d15822773abe610c6 Author: liuminghui233 <[email protected]> AuthorDate: Mon Nov 28 10:31:40 2022 +0800 tmp save before run --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 8 +- .../source/AbstractSeriesScanOperator.java | 156 ++++++++++++ .../operator/source/AlignedSeriesScanUtil.java | 2 + .../execution/operator/source/SeriesScanUtil.java | 269 ++++++++++++++++++++- .../query/reader/chunk/MemAlignedPageReader.java | 7 +- .../iotdb/db/query/reader/chunk/MemPageReader.java | 8 +- .../tsfile/read/common/block/TsBlockBuilder.java | 2 +- .../iotdb/tsfile/read/reader/IPageReader.java | 25 ++ .../tsfile/read/reader/page/AlignedPageReader.java | 7 +- .../iotdb/tsfile/read/reader/page/PageReader.java | 7 +- 10 files changed, 480 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 0f923aef2e..be61fc6f91 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -397,13 +397,13 @@ public class IoTDBConfig { private int avgSeriesPointNumberThreshold = 100000; /** Enable inner space compaction for sequence files */ - private boolean enableSeqSpaceCompaction = true; + private boolean enableSeqSpaceCompaction = false; /** Enable inner space compaction for unsequence files */ - private boolean enableUnseqSpaceCompaction = true; + private boolean enableUnseqSpaceCompaction = false; /** Compact the unsequence files into the overlapped sequence files */ - private boolean enableCrossSpaceCompaction = true; + private boolean enableCrossSpaceCompaction = false; /** * The strategy of inner space compaction task. There are just one inner space compaction strategy @@ -560,7 +560,7 @@ public class IoTDBConfig { private long cacheFileReaderClearPeriod = 100000; /** the max executing time of query in ms. Unit: millisecond */ - private long queryTimeoutThreshold = 60000; + private long queryTimeoutThreshold = 600000; /** the max time to live of a session in ms. Unit: millisecond */ private int sessionTimeoutThreshold = 0; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java new file mode 100644 index 0000000000..ae3191626a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.operator.source; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractSeriesScanOperator implements DataSourceOperator { + + private final OperatorContext operatorContext; + private final SeriesScanUtil seriesScanUtil; + private final PlanNodeId sourceId; + + private boolean finished = false; + + private final TsBlockBuilder resultBuilder; + + private final long maxReturnSize; + + public AbstractSeriesScanOperator( + PlanNodeId sourceId, + SeriesScanUtil seriesScanUtil, + int subSensorSize, + OperatorContext context) { + this.sourceId = sourceId; + this.operatorContext = context; + this.seriesScanUtil = seriesScanUtil; + this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder(); + + // time + all value columns + this.maxReturnSize = + (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() { + TsBlock block = resultBuilder.build(); + resultBuilder.reset(); + return block; + } + + @Override + public boolean hasNext() { + try { + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + + // here use do-while to promise doing this at least once + do { + // consume page data firstly + if (readPageData()) { + continue; + } + + // consume chunk data secondly + if (readChunkData()) { + continue; + } + + // consume next file finally + if (readFileData()) { + continue; + } + break; + + } while (System.nanoTime() - start < maxRuntime && !resultBuilder.isFull()); + + finished = resultBuilder.isEmpty(); + return !finished; + } catch (IOException e) { + throw new RuntimeException("Error happened while scanning the file", e); + } + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public long calculateMaxPeekMemory() { + return maxReturnSize; + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0L; + } + + private boolean readFileData() throws IOException { + while (seriesScanUtil.hasNextFile()) { + if (readChunkData()) { + return true; + } + } + return false; + } + + private boolean readChunkData() throws IOException { + while (seriesScanUtil.hasNextChunk()) { + if (readPageData()) { + return true; + } + } + return false; + } + + private boolean readPageData() throws IOException { + return seriesScanUtil.tryToFetchDataFromPage(); + } + + @Override + public PlanNodeId getSourceId() { + return sourceId; + } + + @Override + public void initQueryDataSource(QueryDataSource dataSource) { + seriesScanUtil.initQueryDataSource(dataSource); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java index f41456b063..6cc0689003 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanUtil.java @@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -57,6 +58,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { dataTypes = ((AlignedPath) seriesPath) .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); + cachedTsBlockBuilder = new TsBlockBuilder(dataTypes); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java index 22fd2cbabc..b667d08858 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanUtil.java @@ -70,7 +70,7 @@ public class SeriesScanUtil { protected final TSDataType dataType; // inner class of SeriesReader for order purpose - private TimeOrderUtils orderUtils; + private final TimeOrderUtils orderUtils; /* * There is at most one is not null between timeFilter and valueFilter @@ -121,6 +121,8 @@ public class SeriesScanUtil { protected boolean hasCachedNextOverlappedPage; protected TsBlock cachedTsBlock; + protected TsBlockBuilder cachedTsBlockBuilder; + public SeriesScanUtil( PartialPath seriesPath, Set<String> allSensors, @@ -156,6 +158,10 @@ public class SeriesScanUtil { new PriorityQueue<>( orderUtils.comparingLong( versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics()))); + + if (dataType != TSDataType.VECTOR) { + this.cachedTsBlockBuilder = new TsBlockBuilder(Collections.singletonList(dataType)); + } } public void initQueryDataSource(QueryDataSource dataSource) { @@ -450,6 +456,259 @@ public class SeriesScanUtil { return firstPageReader != null; } + public boolean tryToFetchDataFromPage() throws IOException { + + /* + * has overlapped data + */ + if (mergeReader.hasNextTimeValuePair() || firstPageOverlapped()) { + if (tryToBuildFromMergeReader()) { + return true; + } + } + + if (firstPageReader != null) { + buildFromPageReader(); + return true; + } + + /* + * construct first page reader + */ + if (firstChunkMetadata != null) { + /* + * try to unpack all overlapped ChunkMetadata to cachedPageReaders + */ + unpackAllOverlappedChunkMetadataToPageReaders( + orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), true); + } else { + /* + * first chunk metadata is already unpacked, consume cached pages + */ + initFirstPageReader(); + } + + if (tryToBuildFromOverlappedPage()) { + return true; + } + + // make sure firstPageReader won't be null while the unSeqPageReaders has more cached page + // readers + while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) { + + initFirstPageReader(); + + if (tryToBuildFromOverlappedPage()) { + return true; + } + } + + if (firstPageReader != null) { + buildFromPageReader(); + return true; + } + return false; + } + + private boolean tryToBuildFromOverlappedPage() throws IOException { + if (firstPageOverlapped()) { + // next page is overlapped, read overlapped data and cache it + return tryToBuildFromMergeReader(); + } + return false; + } + + public void buildFromPageReader() throws IOException { + /* + * next page is not overlapped, push down value filter if it exists + */ + if (valueFilter != null) { + firstPageReader.setFilter(valueFilter); + } + firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending(), cachedTsBlockBuilder); + firstPageReader = null; + } + + private boolean tryToBuildFromMergeReader() throws IOException { + int rawSize = cachedTsBlockBuilder.getPositionCount(); + + tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); + + while (true) { + + // may has overlapped data + if (mergeReader.hasNextTimeValuePair()) { + + // TODO we still need to consider data type, ascending and descending here + TimeColumnBuilder timeBuilder = cachedTsBlockBuilder.getTimeColumnBuilder(); + long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); + while (mergeReader.hasNextTimeValuePair()) { + + /* + * get current first point in mergeReader, this maybe overlapped later + */ + TimeValuePair timeValuePair = mergeReader.currentTimeValuePair(); + + if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) { + /* + * when the merged point excesses the currentPageEndPointTime, we have read all overlapped data before currentPageEndPointTime + * 1. has cached batch data, we don't need to read more data, just use the cached data later + * 2. has first page reader, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), + * we could just use the first page reader later + * 3. sequence page reader is not empty, which means first page reader last endTime < currentTimeValuePair.getTimestamp(), + * we could use the first sequence page reader later + */ + if (cachedTsBlockBuilder.getPositionCount() > rawSize + || firstPageReader != null + || !seqPageReaders.isEmpty()) { + break; + } + // so, we don't have other data except mergeReader + currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); + } + + // unpack all overlapped data for the first timeValuePair + unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp()); + unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( + timeValuePair.getTimestamp(), false); + unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); + unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); + + // update if there are unpacked unSeqPageReaders + timeValuePair = mergeReader.currentTimeValuePair(); + + // from now, the unsequence reader is all unpacked, so we don't need to consider it + // we has first page reader now + if (firstPageReader != null) { + // if current timeValuePair excesses the first page reader's end time, we just use the + // cached data + if ((orderUtils.getAscending() + && timeValuePair.getTimestamp() > firstPageReader.getStatistics().getEndTime()) + || (!orderUtils.getAscending() + && timeValuePair.getTimestamp() + < firstPageReader.getStatistics().getStartTime())) { + return cachedTsBlockBuilder.getPositionCount() > rawSize; + } else if (orderUtils.isOverlapped( + timeValuePair.getTimestamp(), firstPageReader.getStatistics())) { + // current timeValuePair is overlapped with firstPageReader, add it to merged reader + // and update endTime to the max end time + mergeReader.addReader( + getPointReader( + firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())), + firstPageReader.version, + orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()), + context); + currentPageEndPointTime = + updateEndPointTime(currentPageEndPointTime, firstPageReader); + firstPageReader = null; + } + } + + // the seq page readers is not empty, just like first page reader + if (!seqPageReaders.isEmpty()) { + if ((orderUtils.getAscending() + && timeValuePair.getTimestamp() + > seqPageReaders.get(0).getStatistics().getEndTime()) + || (!orderUtils.getAscending() + && timeValuePair.getTimestamp() + < seqPageReaders.get(0).getStatistics().getStartTime())) { + return cachedTsBlockBuilder.getPositionCount() > rawSize; + } else if (orderUtils.isOverlapped( + timeValuePair.getTimestamp(), seqPageReaders.get(0).getStatistics())) { + VersionPageReader pageReader = seqPageReaders.remove(0); + mergeReader.addReader( + getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())), + pageReader.version, + orderUtils.getOverlapCheckTime(pageReader.getStatistics()), + context); + currentPageEndPointTime = updateEndPointTime(currentPageEndPointTime, pageReader); + } + } + + /* + * get the latest first point in mergeReader + */ + timeValuePair = mergeReader.nextTimeValuePair(); + + Object valueForFilter = timeValuePair.getValue().getValue(); + + // TODO fix value filter firstNotNullObject, currently, if it's a value filter, it will + // only accept AlignedPath with only one sub sensor + if (timeValuePair.getValue().getDataType() == TSDataType.VECTOR) { + for (TsPrimitiveType tsPrimitiveType : timeValuePair.getValue().getVector()) { + if (tsPrimitiveType != null) { + valueForFilter = tsPrimitiveType.getValue(); + break; + } + } + } + + if (valueFilter == null + || valueFilter.satisfy(timeValuePair.getTimestamp(), valueForFilter)) { + timeBuilder.writeLong(timeValuePair.getTimestamp()); + switch (dataType) { + case BOOLEAN: + cachedTsBlockBuilder + .getColumnBuilder(0) + .writeBoolean(timeValuePair.getValue().getBoolean()); + break; + case INT32: + cachedTsBlockBuilder + .getColumnBuilder(0) + .writeInt(timeValuePair.getValue().getInt()); + break; + case INT64: + cachedTsBlockBuilder + .getColumnBuilder(0) + .writeLong(timeValuePair.getValue().getLong()); + break; + case FLOAT: + cachedTsBlockBuilder + .getColumnBuilder(0) + .writeFloat(timeValuePair.getValue().getFloat()); + break; + case DOUBLE: + cachedTsBlockBuilder + .getColumnBuilder(0) + .writeDouble(timeValuePair.getValue().getDouble()); + break; + case TEXT: + cachedTsBlockBuilder + .getColumnBuilder(0) + .writeBinary(timeValuePair.getValue().getBinary()); + break; + case VECTOR: + TsPrimitiveType[] values = timeValuePair.getValue().getVector(); + for (int i = 0; i < values.length; i++) { + if (values[i] == null) { + cachedTsBlockBuilder.getColumnBuilder(i).appendNull(); + } else { + cachedTsBlockBuilder.getColumnBuilder(i).writeTsPrimitiveType(values[i]); + } + } + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + cachedTsBlockBuilder.declarePosition(); + } + } + + /* + * if current overlapped page has valid data, return, otherwise read next overlapped page + */ + if (cachedTsBlockBuilder.getPositionCount() > rawSize) { + return true; + } else if (mergeReader.hasNextTimeValuePair()) { + // condition: seqPage.endTime < mergeReader.currentTime + return false; + } + } else { + return false; + } + } + } + private boolean isExistOverlappedPage() throws IOException { if (firstPageOverlapped()) { /* @@ -1088,6 +1347,10 @@ public class SeriesScanUtil { return orderUtils; } + public TsBlockBuilder getCachedTsBlockBuilder() { + return cachedTsBlockBuilder; + } + protected class VersionPageReader { protected PriorityMergeReader.MergeReaderPriority version; @@ -1131,6 +1394,10 @@ public class SeriesScanUtil { return tsBlock; } + void getAllSatisfiedPageData(boolean ascending, TsBlockBuilder builder) throws IOException { + data.getAllSatisfiedData(ascending, builder); + } + void setFilter(Filter filter) { data.setFilter(filter); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java index 41f4d7c1fd..ae13babf9f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java @@ -88,7 +88,12 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { @Override public TsBlock getAllSatisfiedData() { builder.reset(); + writeDataToBuilder(builder); + return builder.build(); + } + @Override + public void writeDataToBuilder(TsBlockBuilder builder) { boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()]; for (int row = 0; row < tsBlock.getPositionCount(); row++) { @@ -130,8 +135,6 @@ public class MemAlignedPageReader implements IPageReader, IAlignedPageReader { } } } - - return builder.build(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java index 0baf315eff..85f08de1c8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java @@ -92,6 +92,13 @@ public class MemPageReader implements IPageReader { public TsBlock getAllSatisfiedData() { TSDataType dataType = chunkMetadata.getDataType(); TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); + writeDataToBuilder(builder); + return builder.build(); + } + + @Override + public void writeDataToBuilder(TsBlockBuilder builder) { + TSDataType dataType = chunkMetadata.getDataType(); TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); ColumnBuilder valueBuilder = builder.getColumnBuilder(0); switch (dataType) { @@ -164,7 +171,6 @@ public class MemPageReader implements IPageReader { default: throw new UnSupportedDataTypeException(String.valueOf(dataType)); } - return builder.build(); } @Override diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java index d4152d4ddb..c309835a09 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java @@ -259,7 +259,7 @@ public class TsBlockBuilder { } public boolean isFull() { - return declaredPositions == MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull(); + return declaredPositions >= MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull(); } public boolean isEmpty() { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java index a68f4590b1..a967cae19c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPageReader.java @@ -22,6 +22,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.io.IOException; @@ -37,6 +40,28 @@ public interface IPageReader { TsBlock getAllSatisfiedData() throws IOException; + void writeDataToBuilder(TsBlockBuilder builder) throws IOException; + + default void getAllSatisfiedData(boolean ascending, TsBlockBuilder builder) throws IOException { + if (ascending) { + writeDataToBuilder(builder); + } else { + TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); + ColumnBuilder[] valueColumnBuilders = builder.getValueColumnBuilders(); + int columnNum = valueColumnBuilders.length; + + TsBlock tsBlock = getAllSatisfiedData(); + tsBlock.reverse(); + for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { + timeColumnBuilder.write(tsBlock.getTimeColumn(), i); + for (int columnIndex = 0; columnIndex < columnNum; columnIndex++) { + valueColumnBuilders[columnIndex].write(tsBlock.getColumn(columnIndex), i); + } + builder.declarePosition(); + } + } + } + Statistics getStatistics(); void setFilter(Filter filter); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java index 3f076906bb..faae955198 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/AlignedPageReader.java @@ -116,6 +116,12 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { @Override public TsBlock getAllSatisfiedData() throws IOException { builder.reset(); + writeDataToBuilder(builder); + return builder.build(); + } + + @Override + public void writeDataToBuilder(TsBlockBuilder builder) throws IOException { long[] timeBatch = timePageReader.getNextTimeBatch(); // if all the sub sensors' value are null in current row, just discard it @@ -189,7 +195,6 @@ public class AlignedPageReader implements IPageReader, IAlignedPageReader { } } } - return builder.build(); } public void setDeleteIntervalList(List<List<TimeRange>> list) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index e1fce8ff65..190bb11b5c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -161,6 +161,12 @@ public class PageReader implements IPageReader { @Override public TsBlock getAllSatisfiedData() throws IOException { TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(dataType)); + writeDataToBuilder(builder); + return builder.build(); + } + + @Override + public void writeDataToBuilder(TsBlockBuilder builder) throws IOException { TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder(); ColumnBuilder valueBuilder = builder.getColumnBuilder(0); if (filter == null || filter.satisfy(getStatistics())) { @@ -235,7 +241,6 @@ public class PageReader implements IPageReader { throw new UnSupportedDataTypeException(String.valueOf(dataType)); } } - return builder.build(); } @Override
