This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new acaad858542 [IOTDB-6339] Optimize the time slice control of
SeriesScanOperator and AlignedSeriesScanOperator
acaad858542 is described below
commit acaad8585429c4e1ea44d1d1bc4096a7b975ac3a
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 18 21:33:49 2024 +0800
[IOTDB-6339] Optimize the time slice control of SeriesScanOperator and
AlignedSeriesScanOperator
---
...erator.java => AbstractSeriesScanOperator.java} | 123 +++++++--------------
.../operator/source/AlignedSeriesScanOperator.java | 110 +-----------------
.../operator/source/SeriesScanOperator.java | 111 +------------------
3 files changed, 44 insertions(+), 300 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
similarity index 51%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
index dc8b3c877a5..d8c07d75caf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
@@ -19,56 +19,36 @@
package org.apache.iotdb.db.queryengine.execution.operator.source;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
-import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
-import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
-import org.apache.tsfile.utils.RamUsageEstimator;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
-public class SeriesScanOperator extends AbstractDataSourceOperator {
- private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(SeriesScanOperator.class);
+public abstract class AbstractSeriesScanOperator extends
AbstractDataSourceOperator {
private boolean finished = false;
- public SeriesScanOperator(
- OperatorContext context,
- PlanNodeId sourceId,
- PartialPath seriesPath,
- Ordering scanOrder,
- SeriesScanOptions seriesScanOptions) {
- this.sourceId = sourceId;
- this.operatorContext = context;
- this.seriesScanUtil =
- new SeriesScanUtil(seriesPath, scanOrder, seriesScanOptions,
context.getInstanceContext());
- this.maxReturnSize =
- Math.min(maxReturnSize,
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
- }
-
@Override
public TsBlock next() throws Exception {
if (retainedTsBlock != null) {
return getResultFromRetainedTsBlock();
}
+ // we don't get any data in current batch time slice, just return null
+ if (resultTsBlockBuilder.isEmpty()) {
+ return null;
+ }
resultTsBlock = resultTsBlockBuilder.build();
resultTsBlockBuilder.reset();
return checkTsBlockSizeAndGetResult();
}
+ @Override
+ public boolean isFinished() throws Exception {
+ return finished;
+ }
+
@SuppressWarnings("squid:S112")
@Override
public boolean hasNext() throws Exception {
@@ -81,6 +61,8 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
+ boolean noMoreData = false;
+
// here use do-while to promise doing this at least once
do {
/*
@@ -89,11 +71,15 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
* 3. consume next file finally
*/
if (!readPageData() && !readChunkData() && !readFileData()) {
+ noMoreData = true;
break;
}
- } while (System.nanoTime() - start < maxRuntime &&
!resultTsBlockBuilder.isFull());
- finished = resultTsBlockBuilder.isEmpty();
+ } while (System.nanoTime() - start < maxRuntime
+ && !resultTsBlockBuilder.isFull()
+ && retainedTsBlock == null);
+
+ finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null &&
noMoreData);
return !finished;
} catch (IOException e) {
@@ -101,27 +87,6 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
}
}
- @Override
- public boolean isFinished() throws Exception {
- return finished;
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return Math.max(
- maxReturnSize,
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
- }
-
private boolean readFileData() throws IOException {
while (seriesScanUtil.hasNextFile()) {
if (readChunkData()) {
@@ -141,45 +106,30 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
}
private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
+ if (seriesScanUtil.hasNextPage()) {
TsBlock tsBlock = seriesScanUtil.nextPage();
-
if (!isEmpty(tsBlock)) {
appendToBuilder(tsBlock);
- return true;
}
+ return true;
}
return false;
}
+ private boolean isEmpty(TsBlock tsBlock) {
+ return tsBlock == null || tsBlock.isEmpty();
+ }
+
private void appendToBuilder(TsBlock tsBlock) {
- TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
- TimeColumn timeColumn = tsBlock.getTimeColumn();
- ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0);
- Column column = tsBlock.getColumn(0);
-
- if (column.mayHaveNull()) {
- for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
- timeColumnBuilder.writeLong(timeColumn.getLong(i));
- if (column.isNull(i)) {
- columnBuilder.appendNull();
- } else {
- columnBuilder.write(column, i);
- }
- resultTsBlockBuilder.declarePosition();
- }
- } else {
- for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
- timeColumnBuilder.writeLong(timeColumn.getLong(i));
- columnBuilder.write(column, i);
- resultTsBlockBuilder.declarePosition();
- }
+ int size = tsBlock.getPositionCount();
+ if (resultTsBlockBuilder.isEmpty() && size >=
resultTsBlockBuilder.getMaxTsBlockLineNumber()) {
+ retainedTsBlock = tsBlock;
+ return;
}
+ buildResult(tsBlock);
}
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
- }
+ protected abstract void buildResult(TsBlock tsBlock);
@Override
protected List<TSDataType> getResultDataTypes() {
@@ -187,11 +137,12 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
}
@Override
- public long ramBytesUsed() {
- return INSTANCE_SIZE
- +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
- +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
- + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
- + (resultTsBlockBuilder == null ? 0 :
resultTsBlockBuilder.getRetainedSizeInBytes());
+ public long calculateMaxReturnSize() {
+ return maxReturnSize;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
index 22cb94b9ec1..95ea59abac4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
@@ -38,16 +38,13 @@ import
org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.RamUsageEstimator;
-import java.io.IOException;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-public class AlignedSeriesScanOperator extends AbstractDataSourceOperator {
+public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(AlignedSeriesScanOperator.class);
private final int valueColumnCount;
- private boolean finished = false;
private int maxTsBlockLineNum = -1;
public AlignedSeriesScanOperator(
@@ -78,56 +75,6 @@ public class AlignedSeriesScanOperator extends
AbstractDataSourceOperator {
this.maxTsBlockLineNum = maxTsBlockLineNum;
}
- @Override
- public TsBlock next() throws Exception {
- if (retainedTsBlock != null) {
- return getResultFromRetainedTsBlock();
- }
- resultTsBlock = resultTsBlockBuilder.build();
- resultTsBlockBuilder.reset();
- return checkTsBlockSizeAndGetResult();
- }
-
- @SuppressWarnings("squid:S112")
- @Override
- public boolean hasNext() throws Exception {
- if (retainedTsBlock != null) {
- return true;
- }
- try {
-
- // start stopwatch
- long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
- long start = System.nanoTime();
-
- // here use do-while to promise doing this at least once
- do {
- /*
- * 1. consume page data firstly
- * 2. consume chunk data secondly
- * 3. consume next file finally
- */
- if (!readPageData() && !readChunkData() && !readFileData()) {
- break;
- }
-
- } while (System.nanoTime() - start < maxRuntime
- && !resultTsBlockBuilder.isFull()
- && retainedTsBlock == null);
-
- finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null);
-
- return !finished;
- } catch (IOException e) {
- throw new RuntimeException("Error happened while scanning the file", e);
- }
- }
-
- @Override
- public boolean isFinished() throws Exception {
- return finished;
- }
-
@Override
public long calculateMaxPeekMemory() {
return Math.max(
@@ -138,51 +85,8 @@ public class AlignedSeriesScanOperator extends
AbstractDataSourceOperator {
}
@Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
- }
-
- private boolean readFileData() throws IOException {
- while (seriesScanUtil.hasNextFile()) {
- if (readChunkData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- TsBlock tsBlock = seriesScanUtil.nextPage();
- if (!isEmpty(tsBlock)) {
- appendToBuilder(tsBlock);
- return true;
- }
- }
- return false;
- }
-
- private void appendToBuilder(TsBlock tsBlock) {
+ protected void buildResult(TsBlock tsBlock) {
int size = tsBlock.getPositionCount();
- if (resultTsBlockBuilder.isEmpty()
- && tsBlock.getPositionCount() >=
resultTsBlockBuilder.getMaxTsBlockLineNumber()) {
- retainedTsBlock = tsBlock;
- return;
- }
TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
TimeColumn timeColumn = tsBlock.getTimeColumn();
for (int i = 0; i < size; i++) {
@@ -214,16 +118,6 @@ public class AlignedSeriesScanOperator extends
AbstractDataSourceOperator {
}
}
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
- }
-
- @Override
- protected List<TSDataType> getResultDataTypes() {
- // time + all value columns
- return seriesScanUtil.getTsDataTypeList();
- }
-
@Override
public void initQueryDataSource(IQueryDataSource dataSource) {
seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
index dc8b3c877a5..d4ce1caa624 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
@@ -29,22 +29,15 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileDescriptor;
-import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.RamUsageEstimator;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class SeriesScanOperator extends AbstractDataSourceOperator {
+public class SeriesScanOperator extends AbstractSeriesScanOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(SeriesScanOperator.class);
- private boolean finished = false;
-
public SeriesScanOperator(
OperatorContext context,
PlanNodeId sourceId,
@@ -59,53 +52,6 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
Math.min(maxReturnSize,
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}
- @Override
- public TsBlock next() throws Exception {
- if (retainedTsBlock != null) {
- return getResultFromRetainedTsBlock();
- }
- resultTsBlock = resultTsBlockBuilder.build();
- resultTsBlockBuilder.reset();
- return checkTsBlockSizeAndGetResult();
- }
-
- @SuppressWarnings("squid:S112")
- @Override
- public boolean hasNext() throws Exception {
- if (retainedTsBlock != null) {
- return true;
- }
- try {
-
- // start stopwatch
- long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
- long start = System.nanoTime();
-
- // here use do-while to promise doing this at least once
- do {
- /*
- * 1. consume page data firstly
- * 2. consume chunk data secondly
- * 3. consume next file finally
- */
- if (!readPageData() && !readChunkData() && !readFileData()) {
- break;
- }
- } while (System.nanoTime() - start < maxRuntime &&
!resultTsBlockBuilder.isFull());
-
- finished = resultTsBlockBuilder.isEmpty();
-
- return !finished;
- } catch (IOException e) {
- throw new RuntimeException("Error happened while scanning the file", e);
- }
- }
-
- @Override
- public boolean isFinished() throws Exception {
- return finished;
- }
-
@Override
public long calculateMaxPeekMemory() {
return Math.max(
@@ -113,53 +59,15 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
}
@Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return calculateMaxPeekMemoryWithCounter() - calculateMaxReturnSize();
- }
-
- private boolean readFileData() throws IOException {
- while (seriesScanUtil.hasNextFile()) {
- if (readChunkData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- TsBlock tsBlock = seriesScanUtil.nextPage();
-
- if (!isEmpty(tsBlock)) {
- appendToBuilder(tsBlock);
- return true;
- }
- }
- return false;
- }
-
- private void appendToBuilder(TsBlock tsBlock) {
+ protected void buildResult(TsBlock tsBlock) {
+ int size = tsBlock.getPositionCount();
TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
TimeColumn timeColumn = tsBlock.getTimeColumn();
ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0);
Column column = tsBlock.getColumn(0);
if (column.mayHaveNull()) {
- for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+ for (int i = 0; i < size; i++) {
timeColumnBuilder.writeLong(timeColumn.getLong(i));
if (column.isNull(i)) {
columnBuilder.appendNull();
@@ -169,7 +77,7 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
resultTsBlockBuilder.declarePosition();
}
} else {
- for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) {
+ for (int i = 0; i < size; i++) {
timeColumnBuilder.writeLong(timeColumn.getLong(i));
columnBuilder.write(column, i);
resultTsBlockBuilder.declarePosition();
@@ -177,15 +85,6 @@ public class SeriesScanOperator extends
AbstractDataSourceOperator {
}
}
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
- }
-
- @Override
- protected List<TSDataType> getResultDataTypes() {
- return seriesScanUtil.getTsDataTypeList();
- }
-
@Override
public long ramBytesUsed() {
return INSTANCE_SIZE