This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/scanOpBatchProcess1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ceb9331558b142f3d470916372f32e5653d73f5c Author: Minghui Liu <[email protected]> AuthorDate: Wed Nov 30 22:56:58 2022 +0800 fix maxReturnSize --- .../source/AbstractSeriesScanOperator.java | 10 +- .../operator/source/AlignedSeriesScanOperator.java | 180 +-------------------- .../operator/source/SeriesScanOperator.java | 174 +------------------- 3 files changed, 16 insertions(+), 348 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java index ae3191626a..98c3dc7d3f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesScanOperator.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; @@ -44,16 +43,13 @@ public abstract class AbstractSeriesScanOperator implements DataSourceOperator { public AbstractSeriesScanOperator( PlanNodeId sourceId, SeriesScanUtil seriesScanUtil, - int subSensorSize, - OperatorContext context) { + OperatorContext context, + long maxReturnSize) { this.sourceId = sourceId; this.operatorContext = context; this.seriesScanUtil = seriesScanUtil; this.resultBuilder = seriesScanUtil.getCachedTsBlockBuilder(); - - // time + all value columns - this.maxReturnSize = - (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + this.maxReturnSize = maxReturnSize; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java index 9caaeeac60..482bcb5001 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesScanOperator.java @@ -19,32 +19,14 @@ package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.AlignedPath; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.Column; -import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; -import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import java.io.IOException; import java.util.HashSet; -import java.util.concurrent.TimeUnit; -public class AlignedSeriesScanOperator implements DataSourceOperator { - - private final OperatorContext operatorContext; - private final AlignedSeriesScanUtil seriesScanUtil; - private final PlanNodeId sourceId; - - private final TsBlockBuilder builder; - private boolean finished = false; - - private final long maxReturnSize; +public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator { public AlignedSeriesScanOperator( PlanNodeId sourceId, @@ -53,166 +35,18 @@ public class AlignedSeriesScanOperator implements DataSourceOperator { Filter timeFilter, Filter valueFilter, boolean ascending) { - this.sourceId = sourceId; - this.operatorContext = context; - this.seriesScanUtil = + super( + sourceId, new AlignedSeriesScanUtil( seriesPath, new HashSet<>(seriesPath.getMeasurementList()), context.getInstanceContext(), timeFilter, valueFilter, - ascending); - // time + all value columns - this.maxReturnSize = + ascending), + context, + // time + all value columns (1L + seriesPath.getMeasurementList().size()) - * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); - this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList()); - } - - @Override - public OperatorContext getOperatorContext() { - return operatorContext; - } - - @Override - public TsBlock next() { - TsBlock block = builder.build(); - builder.reset(); - return block; - } - - @Override - public boolean hasNext() { - try { - - // start stopwatch - long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); - long start = System.nanoTime(); - - // here use do-while to promise doing this at least once - do { - /* - * consume page data firstly - */ - if (readPageData()) { - continue; - } - - /* - * consume chunk data secondly - */ - if (readChunkData()) { - continue; - } - - /* - * consume next file finally - */ - if (readFileData()) { - continue; - } - break; - - } while (System.nanoTime() - start < maxRuntime && !builder.isFull()); - - finished = builder.isEmpty(); - - return !finished; - } catch (IOException e) { - throw new RuntimeException("Error happened while scanning the file", e); - } - } - - @Override - public boolean isFinished() { - return finished; - } - - @Override - public long calculateMaxPeekMemory() { - return maxReturnSize; - } - - @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return 0L; - } - - private boolean readFileData() throws IOException { - while (seriesScanUtil.hasNextFile()) { - if (readChunkData()) { - return true; - } - } - return false; - } - - private boolean readChunkData() throws IOException { - while (seriesScanUtil.hasNextChunk()) { - if (readPageData()) { - return true; - } - } - return false; - } - - private boolean readPageData() throws IOException { - while (seriesScanUtil.hasNextPage()) { - TsBlock tsBlock = seriesScanUtil.nextPage(); - if (!isEmpty(tsBlock)) { - appendToBuilder(tsBlock); - return true; - } - } - return false; - } - - private void appendToBuilder(TsBlock tsBlock) { - int size = tsBlock.getPositionCount(); - TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); - TimeColumn timeColumn = tsBlock.getTimeColumn(); - for (int i = 0; i < size; i++) { - timeColumnBuilder.writeLong(timeColumn.getLong(i)); - builder.declarePosition(); - } - for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount(); - columnIndex < columnSize; - columnIndex++) { - ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex); - Column column = tsBlock.getColumn(columnIndex); - if (column.mayHaveNull()) { - for (int i = 0; i < size; i++) { - if (column.isNull(i)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write(column, i); - } - } - } else { - for (int i = 0; i < size; i++) { - columnBuilder.write(column, i); - } - } - } - } - - private boolean isEmpty(TsBlock tsBlock) { - return tsBlock == null || tsBlock.isEmpty(); - } - - @Override - public PlanNodeId getSourceId() { - return sourceId; - } - - @Override - public void initQueryDataSource(QueryDataSource dataSource) { - seriesScanUtil.initQueryDataSource(dataSource); + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java index fd831fcd6a..ead57a4943 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesScanOperator.java @@ -19,33 +19,15 @@ package org.apache.iotdb.db.mpp.execution.operator.source; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.Column; -import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; -import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import java.io.IOException; import java.util.Set; -import java.util.concurrent.TimeUnit; -public class SeriesScanOperator implements DataSourceOperator { - - private final OperatorContext operatorContext; - private final SeriesScanUtil seriesScanUtil; - private final PlanNodeId sourceId; - private final TsBlockBuilder builder; - - private boolean finished = false; - - private final long maxReturnSize; +public class SeriesScanOperator extends AbstractSeriesScanOperator { public SeriesScanOperator( PlanNodeId sourceId, @@ -56,9 +38,8 @@ public class SeriesScanOperator implements DataSourceOperator { Filter timeFilter, Filter valueFilter, boolean ascending) { - this.sourceId = sourceId; - this.operatorContext = context; - this.seriesScanUtil = + super( + sourceId, new SeriesScanUtil( seriesPath, allSensors, @@ -66,151 +47,8 @@ public class SeriesScanOperator implements DataSourceOperator { context.getInstanceContext(), timeFilter, valueFilter, - ascending); - this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); - this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList()); - } - - @Override - public OperatorContext getOperatorContext() { - return operatorContext; - } - - @Override - public TsBlock next() { - TsBlock block = builder.build(); - builder.reset(); - return block; - } - - @Override - public boolean hasNext() { - try { - - // start stopwatch - long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); - long start = System.nanoTime(); - - // here use do-while to promise doing this at least once - do { - /* - * consume page data firstly - */ - if (readPageData()) { - continue; - } - - /* - * consume chunk data secondly - */ - if (readChunkData()) { - continue; - } - - /* - * consume next file finally - */ - if (readFileData()) { - continue; - } - break; - - } while (System.nanoTime() - start < maxRuntime && !builder.isFull()); - - finished = builder.isEmpty(); - - return !finished; - } catch (IOException e) { - throw new RuntimeException("Error happened while scanning the file", e); - } - } - - @Override - public boolean isFinished() { - return finished; - } - - @Override - public long calculateMaxPeekMemory() { - return maxReturnSize; - } - - @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return 0L; - } - - private boolean readFileData() throws IOException { - while (seriesScanUtil.hasNextFile()) { - if (readChunkData()) { - return true; - } - } - return false; - } - - private boolean readChunkData() throws IOException { - while (seriesScanUtil.hasNextChunk()) { - if (readPageData()) { - return true; - } - } - return false; - } - - private boolean readPageData() throws IOException { - while (seriesScanUtil.hasNextPage()) { - TsBlock tsBlock = seriesScanUtil.nextPage(); - - if (!isEmpty(tsBlock)) { - appendToBuilder(tsBlock); - return true; - } - } - return false; - } - - private void appendToBuilder(TsBlock tsBlock) { - TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); - TimeColumn timeColumn = tsBlock.getTimeColumn(); - ColumnBuilder columnBuilder = builder.getColumnBuilder(0); - Column column = tsBlock.getColumn(0); - - if (column.mayHaveNull()) { - for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { - timeColumnBuilder.writeLong(timeColumn.getLong(i)); - if (column.isNull(i)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write(column, i); - } - builder.declarePosition(); - } - } else { - for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { - timeColumnBuilder.writeLong(timeColumn.getLong(i)); - columnBuilder.write(column, i); - builder.declarePosition(); - } - } - } - - private boolean isEmpty(TsBlock tsBlock) { - return tsBlock == null || tsBlock.isEmpty(); - } - - @Override - public PlanNodeId getSourceId() { - return sourceId; - } - - @Override - public void initQueryDataSource(QueryDataSource dataSource) { - seriesScanUtil.initQueryDataSource(dataSource); + ascending), + context, + TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); } }
