This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 553e8de8df [To rel/1.1] [IOTDB-5763] Optimize the memory estimate for
INTO operations (#9599)
553e8de8df is described below
commit 553e8de8df7f679e550ff91c4185c41718e452c3
Author: liuminghui233 <[email protected]>
AuthorDate: Fri Apr 14 00:30:24 2023 +0800
[To rel/1.1] [IOTDB-5763] Optimize the memory estimate for INTO operations
(#9599)
(cherry picked from commit 40f00c6cbb8a03cfa77477f3f8763a1e64a422ae)
---
docs/UserGuide/Reference/Common-Config-Manual.md | 10 +++
.../zh/UserGuide/Reference/Common-Config-Manual.md | 9 +++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 ++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 ++++
.../operator/process/AbstractIntoOperator.java | 64 ++++++++++++------
.../operator/process/DeviceViewIntoOperator.java | 7 +-
.../execution/operator/process/IntoOperator.java | 7 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 14 +---
.../mpp/execution/operator/OperatorMemoryTest.java | 77 ++++++++++++++++++++++
9 files changed, 177 insertions(+), 38 deletions(-)
diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md
b/docs/UserGuide/Reference/Common-Config-Manual.md
index 0c95f5a317..095213a237 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1401,6 +1401,16 @@ Different configuration parameters take effect in the
following three ways:
### SELECT-INTO
+* into\_operation\_buffer\_size\_in\_byte
+
+| Name | into\_operation\_buffer\_size\_in\_byte
|
+| :---------: |
:----------------------------------------------------------------------------------------------------------------------------------
|
+| Description | When the select-into statement is executed, the maximum memory
occupied by the data to be written (unit: Byte) |
+| Type | int64 |
+| Default | 100MB |
+| Effective | hot-load |
+
+
* select\_into\_insert\_tablet\_plan\_row\_limit
| Name | select\_into\_insert\_tablet\_plan\_row\_limit
|
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index 0068927b3c..98efd6cc05 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1439,6 +1439,15 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
#### SELECT-INTO配置
+* into\_operation\_buffer\_size\_in\_byte
+
+| 名字 | into\_operation\_buffer\_size\_in\_byte
|
+| :----------: |
:-------------------------------------------------------------------- |
+| 描述 | 执行 select-into 语句时,待写入数据占用的最大内存(单位:Byte) |
+| 类型 | int64 |
+| 默认值 | 100MB |
+| 改后生效方式 | 热加载 |
+
* select\_into\_insert\_tablet\_plan\_row\_limit
| 名字 | select\_into\_insert\_tablet\_plan\_row\_limit
|
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 24c7b2ee82..5650b54638 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -671,6 +671,9 @@ public class IoTDBConfig {
*/
private long continuousQueryMinimumEveryInterval = 1000;
+ /** How much memory may be used in ONE SELECT INTO operation (in Byte). */
+ private long intoOperationBufferSizeInByte = 100 * 1024 * 1024L;
+
/**
* The maximum number of rows can be processed in insert-tablet-plan when
executing select-into
* statements.
@@ -1878,14 +1881,22 @@ public class IoTDBConfig {
this.continuousQueryMinimumEveryInterval = minimumEveryInterval;
}
- public void setSelectIntoInsertTabletPlanRowLimit(int
selectIntoInsertTabletPlanRowLimit) {
- this.selectIntoInsertTabletPlanRowLimit =
selectIntoInsertTabletPlanRowLimit;
+ public long getIntoOperationBufferSizeInByte() {
+ return intoOperationBufferSizeInByte;
+ }
+
+ public void setIntoOperationBufferSizeInByte(long
intoOperationBufferSizeInByte) {
+ this.intoOperationBufferSizeInByte = intoOperationBufferSizeInByte;
}
public int getSelectIntoInsertTabletPlanRowLimit() {
return selectIntoInsertTabletPlanRowLimit;
}
+ public void setSelectIntoInsertTabletPlanRowLimit(int
selectIntoInsertTabletPlanRowLimit) {
+ this.selectIntoInsertTabletPlanRowLimit =
selectIntoInsertTabletPlanRowLimit;
+ }
+
public int getIntoOperationExecutionThreadCount() {
return intoOperationExecutionThreadCount;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57ac532d57..0010633da9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -901,6 +901,11 @@ public class IoTDBDescriptor {
// mqtt
loadMqttProps(properties);
+ conf.setIntoOperationBufferSizeInByte(
+ Long.parseLong(
+ properties.getProperty(
+ "into_operation_buffer_size_in_byte",
+ String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
conf.setSelectIntoInsertTabletPlanRowLimit(
Integer.parseInt(
properties.getProperty(
@@ -1454,6 +1459,13 @@ public class IoTDBDescriptor {
properties.getProperty(
"merge_write_throughput_mb_per_sec",
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+
+ // update select into operation max buffer size
+ conf.setIntoOperationBufferSizeInByte(
+ Long.parseLong(
+ properties.getProperty(
+ "into_operation_buffer_size_in_byte",
+ String.valueOf(conf.getIntoOperationBufferSizeInByte()))));
// update insert-tablet-plan's row limit for select-into
conf.setSelectIntoInsertTabletPlanRowLimit(
Integer.parseInt(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 3d65c1659c..8816776d88 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.client.DataNodeInternalClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.IntoProcessException;
@@ -78,8 +79,9 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
protected boolean finished = false;
- private final long maxRetainedSize;
- private final long maxReturnSize;
+ protected int maxRowNumberInStatement;
+ private long maxRetainedSize;
+ private long maxReturnSize;
protected final List<Type> typeConvertors;
@@ -89,7 +91,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
List<TSDataType> inputColumnTypes,
Map<String, InputLocation> sourceColumnToInputLocationMap,
ExecutorService intoOperationExecutor,
- long maxStatementSize) {
+ long statementSizePerLine) {
this.operatorContext = operatorContext;
this.child = child;
this.typeConvertors =
@@ -97,7 +99,23 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
this.writeOperationExecutor = intoOperationExecutor;
+ initMemoryEstimates(statementSizePerLine);
+ }
+
+ private void initMemoryEstimates(long statementSizePerLine) {
+ long intoOperationBufferSizeInByte =
+
IoTDBDescriptor.getInstance().getConfig().getIntoOperationBufferSizeInByte();
+ long memAllowedMaxRowNumber = Math.max(intoOperationBufferSizeInByte /
statementSizePerLine, 1);
+ if (memAllowedMaxRowNumber > Integer.MAX_VALUE) {
+ memAllowedMaxRowNumber = Integer.MAX_VALUE;
+ }
+ int maxRowNumberInStatement =
+ Math.min(
+ (int) memAllowedMaxRowNumber,
+
IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit());
+ long maxStatementSize = maxRowNumberInStatement * statementSizePerLine;
+ this.maxRowNumberInStatement = maxRowNumberInStatement;
this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize;
this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
@@ -209,7 +227,8 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
Map<PartialPath, Map<String, InputLocation>>
targetPathToSourceInputLocationMap,
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
Map<String, Boolean> targetDeviceToAlignedMap,
- List<Type> sourceTypeConvertors) {
+ List<Type> sourceTypeConvertors,
+ int maxRowNumberInStatement) {
List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
new ArrayList<>(targetPathToSourceInputLocationMap.size());
for (Map.Entry<PartialPath, Map<String, InputLocation>> entry :
@@ -221,7 +240,8 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
entry.getValue(),
targetPathToDataTypeMap.get(targetDevice),
targetDeviceToAlignedMap.get(targetDevice.toString()),
- sourceTypeConvertors);
+ sourceTypeConvertors,
+ maxRowNumberInStatement);
insertTabletStatementGenerators.add(generator);
}
return insertTabletStatementGenerators;
@@ -322,10 +342,14 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext();
}
+ @TestOnly
+ public int getMaxRowNumberInStatement() {
+ return maxRowNumberInStatement;
+ }
+
public static class InsertTabletStatementGenerator {
- private final int TABLET_ROW_LIMIT =
-
IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+ private final int rowLimit;
private final PartialPath devicePath;
private final boolean isAligned;
@@ -348,7 +372,8 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
Map<String, InputLocation> measurementToInputLocationMap,
Map<String, TSDataType> measurementToDataTypeMap,
Boolean isAligned,
- List<Type> sourceTypeConvertors) {
+ List<Type> sourceTypeConvertors,
+ int rowLimit) {
this.devicePath = devicePath;
this.isAligned = isAligned;
this.measurements = measurementToInputLocationMap.keySet().toArray(new
String[0]);
@@ -359,32 +384,33 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
writtenCounter.put(measurement, new AtomicInteger(0));
}
this.sourceTypeConvertors = sourceTypeConvertors;
+ this.rowLimit = rowLimit;
this.reset();
}
public void reset() {
this.rowCount = 0;
- this.times = new long[TABLET_ROW_LIMIT];
+ this.times = new long[rowLimit];
this.columns = new Object[this.measurements.length];
for (int i = 0; i < this.measurements.length; i++) {
switch (dataTypes[i]) {
case BOOLEAN:
- columns[i] = new boolean[TABLET_ROW_LIMIT];
+ columns[i] = new boolean[rowLimit];
break;
case INT32:
- columns[i] = new int[TABLET_ROW_LIMIT];
+ columns[i] = new int[rowLimit];
break;
case INT64:
- columns[i] = new long[TABLET_ROW_LIMIT];
+ columns[i] = new long[rowLimit];
break;
case FLOAT:
- columns[i] = new float[TABLET_ROW_LIMIT];
+ columns[i] = new float[rowLimit];
break;
case DOUBLE:
- columns[i] = new double[TABLET_ROW_LIMIT];
+ columns[i] = new double[rowLimit];
break;
case TEXT:
- columns[i] = new Binary[TABLET_ROW_LIMIT];
+ columns[i] = new Binary[rowLimit];
Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
break;
default:
@@ -394,7 +420,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
}
this.bitMaps = new BitMap[this.measurements.length];
for (int i = 0; i < this.bitMaps.length; ++i) {
- this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
+ this.bitMaps[i] = new BitMap(rowLimit);
this.bitMaps[i].markAll();
}
}
@@ -452,7 +478,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
++rowCount;
++lastReadIndex;
- if (rowCount == TABLET_ROW_LIMIT) {
+ if (rowCount == rowLimit) {
break;
}
}
@@ -460,7 +486,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
}
public boolean isFull() {
- return rowCount == TABLET_ROW_LIMIT;
+ return rowCount == rowLimit;
}
public boolean isEmpty() {
@@ -475,7 +501,7 @@ public abstract class AbstractIntoOperator implements
ProcessOperator {
insertTabletStatement.setDataTypes(dataTypes);
insertTabletStatement.setRowCount(rowCount);
- if (rowCount != TABLET_ROW_LIMIT) {
+ if (rowCount != rowLimit) {
times = Arrays.copyOf(times, rowCount);
for (int i = 0; i < columns.length; i++) {
bitMaps[i] = bitMaps[i].getRegion(0, rowCount);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 0ddd7ccc31..5f2d202219 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -64,14 +64,14 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
Map<String, List<Pair<String, PartialPath>>>
deviceToSourceTargetPathPairListMap,
Map<String, InputLocation> sourceColumnToInputLocationMap,
ExecutorService intoOperationExecutor,
- long maxStatementSize) {
+ long statementSizePerLine) {
super(
operatorContext,
child,
inputColumnTypes,
sourceColumnToInputLocationMap,
intoOperationExecutor,
- maxStatementSize);
+ statementSizePerLine);
this.deviceToTargetPathSourceInputLocationMap =
deviceToTargetPathSourceInputLocationMap;
this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
@@ -148,7 +148,8 @@ public class DeviceViewIntoOperator extends
AbstractIntoOperator {
targetPathToSourceInputLocationMap,
targetPathToDataTypeMap,
targetDeviceToAlignedMap,
- typeConvertors);
+ typeConvertors,
+ maxRowNumberInStatement);
}
private void updateResultTsBlock() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 34a8edc88a..d5e65d262e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -52,21 +52,22 @@ public class IntoOperator extends AbstractIntoOperator {
List<Pair<String, PartialPath>> sourceTargetPathPairList,
Map<String, InputLocation> sourceColumnToInputLocationMap,
ExecutorService intoOperationExecutor,
- long maxStatementSize) {
+ long statementSizePerLine) {
super(
operatorContext,
child,
inputColumnTypes,
sourceColumnToInputLocationMap,
intoOperationExecutor,
- maxStatementSize);
+ statementSizePerLine);
this.sourceTargetPathPairList = sourceTargetPathPairList;
insertTabletStatementGenerators =
constructInsertTabletStatementGenerators(
targetPathToSourceInputLocationMap,
targetPathToDataTypeMap,
targetDeviceToAlignedMap,
- typeConvertors);
+ typeConvertors,
+ maxRowNumberInStatement);
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 110a7c2cfc..2742b4bce2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -1622,10 +1621,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
intoPathDescriptor.getTargetPathToDataTypeMap();
-
- int rowLimit =
-
IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
- long maxStatementSize =
calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit;
+ long statementSizePerLine =
calculateStatementSizePerLine(targetPathToDataTypeMap);
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
@@ -1639,7 +1635,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
intoPathDescriptor.getSourceTargetPathPairList(),
sourceColumnToInputLocationMap,
FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
- maxStatementSize);
+ statementSizePerLine);
}
@Override
@@ -1680,10 +1676,6 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice));
}
- int rowLimit =
-
IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
- long maxStatementSize = statementSizePerLine * rowLimit;
-
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
return new DeviceViewIntoOperator(
operatorContext,
@@ -1695,7 +1687,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(),
sourceColumnToInputLocationMap,
FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
- maxStatementSize);
+ statementSizePerLine);
}
private Map<String, InputLocation>
constructSourceColumnToInputLocationMap(PlanNode node) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index d6343d3392..ec3fa4a9b9 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -1422,4 +1423,80 @@ public class OperatorMemoryTest {
expectedMaxRetainSize + expectedChildrenRetainedSize,
aggregationOperator.calculateRetainedSizeAfterCallingNext());
}
+
+ @Test
+ public void intoOperatorTest() {
+ Operator child = Mockito.mock(Operator.class);
+ Mockito.when(child.calculateMaxPeekMemory())
+ .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ Mockito.when(child.calculateMaxReturnSize())
+ .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L);
+
+ long statementSizePerLine1 = 8 + 1000 * (4 + 8 + 4 + 8 + 1 + 512);
+ IntoOperator intoOperator1 = createIntoOperator(child,
statementSizePerLine1);
+ int expectedMaxRowNumber = 195;
+ long expectedMaxStatementSize = expectedMaxRowNumber *
statementSizePerLine1;
+ assertEquals(expectedMaxRowNumber,
intoOperator1.getMaxRowNumberInStatement());
+ assertEquals(
+ expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator1.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
intoOperator1.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator1.calculateRetainedSizeAfterCallingNext());
+
+ long statementSizePerLine2 = 8 + 1000 * (4 + 8 + 4 + 8 + 1);
+ IntoOperator intoOperator2 = createIntoOperator(child,
statementSizePerLine2);
+ expectedMaxRowNumber = 4192;
+ expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine2;
+ assertEquals(expectedMaxRowNumber,
intoOperator2.getMaxRowNumberInStatement());
+ assertEquals(
+ expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator2.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
intoOperator2.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator2.calculateRetainedSizeAfterCallingNext());
+
+ long statementSizePerLine3 = 8 + 100 * (4 + 8 + 4 + 8 + 1);
+ IntoOperator intoOperator3 = createIntoOperator(child,
statementSizePerLine3);
+ expectedMaxRowNumber = 10000;
+ expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine3;
+ assertEquals(expectedMaxRowNumber,
intoOperator3.getMaxRowNumberInStatement());
+ assertEquals(
+ expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator3.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
intoOperator3.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator3.calculateRetainedSizeAfterCallingNext());
+
+ long statementSizePerLine4 = 8 + 1000000 * (4 + 8 + 4 + 8 + 1 + 512);
+ IntoOperator intoOperator4 = createIntoOperator(child,
statementSizePerLine4);
+ expectedMaxRowNumber = 1;
+ expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine4;
+ assertEquals(expectedMaxRowNumber,
intoOperator4.getMaxRowNumberInStatement());
+ assertEquals(
+ expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator4.calculateMaxPeekMemory());
+ assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
intoOperator4.calculateMaxReturnSize());
+ assertEquals(
+ expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ intoOperator4.calculateRetainedSizeAfterCallingNext());
+ }
+
+ private IntoOperator createIntoOperator(Operator child, long
statementSizePerLine) {
+ return new IntoOperator(
+ Mockito.mock(OperatorContext.class),
+ child,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ null,
+ statementSizePerLine);
+ }
}