This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/IOTDB_5763_1.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 085080ff6efe3550b261a13e9826780538fc2d18 Author: liuminghui233 <[email protected]> AuthorDate: Wed Apr 12 10:11:40 2023 +0800 [IOTDB-5763] Optimize the memory estimate for INTO operations (cherry picked from commit 40f00c6cbb8a03cfa77477f3f8763a1e64a422ae) --- docs/UserGuide/Reference/Common-Config-Manual.md | 10 +++ .../zh/UserGuide/Reference/Common-Config-Manual.md | 9 +++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 15 ++++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 ++++ .../operator/process/AbstractIntoOperator.java | 64 ++++++++++++------ .../operator/process/DeviceViewIntoOperator.java | 7 +- .../execution/operator/process/IntoOperator.java | 7 +- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 14 +--- .../mpp/execution/operator/OperatorMemoryTest.java | 77 ++++++++++++++++++++++ 9 files changed, 177 insertions(+), 38 deletions(-) diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md index 0c95f5a317..095213a237 100644 --- a/docs/UserGuide/Reference/Common-Config-Manual.md +++ b/docs/UserGuide/Reference/Common-Config-Manual.md @@ -1401,6 +1401,16 @@ Different configuration parameters take effect in the following three ways: ### SELECT-INTO +* into\_operation\_buffer\_size\_in\_byte + +| Name | into\_operation\_buffer\_size\_in\_byte | +| :---------: | :---------------------------------------------------------------------------------------------------------------------------------- | +| Description | When the select-into statement is executed, the maximum memory occupied by the data to be written (unit: Byte) | +| Type | int64 | +| Default | 100MB | +| Effective | hot-load | + + * select\_into\_insert\_tablet\_plan\_row\_limit | Name | select\_into\_insert\_tablet\_plan\_row\_limit | diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md index 0068927b3c..98efd6cc05 100644 --- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md +++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md @@ -1439,6 +1439,15 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。 #### SELECT-INTO配置 +* into\_operation\_buffer\_size\_in\_byte + +| 名字 | into\_operation\_buffer\_size\_in\_byte | +| :----------: | :-------------------------------------------------------------------- | +| 描述 | 执行 select-into 语句时,待写入数据占用的最大内存(单位:Byte) | +| 类型 | int64 | +| 默认值 | 100MB | +| 改后生效方式 | 热加载 | + * select\_into\_insert\_tablet\_plan\_row\_limit | 名字 | select\_into\_insert\_tablet\_plan\_row\_limit | diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 24c7b2ee82..5650b54638 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -671,6 +671,9 @@ public class IoTDBConfig { */ private long continuousQueryMinimumEveryInterval = 1000; + /** How much memory may be used in ONE SELECT INTO operation (in Byte). */ + private long intoOperationBufferSizeInByte = 100 * 1024 * 1024L; + /** * The maximum number of rows can be processed in insert-tablet-plan when executing select-into * statements. @@ -1878,14 +1881,22 @@ public class IoTDBConfig { this.continuousQueryMinimumEveryInterval = minimumEveryInterval; } - public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) { - this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit; + public long getIntoOperationBufferSizeInByte() { + return intoOperationBufferSizeInByte; + } + + public void setIntoOperationBufferSizeInByte(long intoOperationBufferSizeInByte) { + this.intoOperationBufferSizeInByte = intoOperationBufferSizeInByte; } public int getSelectIntoInsertTabletPlanRowLimit() { return selectIntoInsertTabletPlanRowLimit; } + public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) { + this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit; + } + public int getIntoOperationExecutionThreadCount() { return intoOperationExecutionThreadCount; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 57ac532d57..0010633da9 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -901,6 +901,11 @@ public class IoTDBDescriptor { // mqtt loadMqttProps(properties); + conf.setIntoOperationBufferSizeInByte( + Long.parseLong( + properties.getProperty( + "into_operation_buffer_size_in_byte", + String.valueOf(conf.getIntoOperationBufferSizeInByte())))); conf.setSelectIntoInsertTabletPlanRowLimit( Integer.parseInt( properties.getProperty( @@ -1454,6 +1459,13 @@ public class IoTDBDescriptor { properties.getProperty( "merge_write_throughput_mb_per_sec", Integer.toString(conf.getCompactionWriteThroughputMbPerSec())))); + + // update select into operation max buffer size + conf.setIntoOperationBufferSizeInByte( + Long.parseLong( + properties.getProperty( + "into_operation_buffer_size_in_byte", + String.valueOf(conf.getIntoOperationBufferSizeInByte())))); // update insert-tablet-plan's row limit for select-into conf.setSelectIntoInsertTabletPlanRowLimit( Integer.parseInt( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 3d65c1659c..8816776d88 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.client.DataNodeInternalClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.IntoProcessException; @@ -78,8 +79,9 @@ public abstract class AbstractIntoOperator implements ProcessOperator { protected boolean finished = false; - private final long maxRetainedSize; - private final long maxReturnSize; + protected int maxRowNumberInStatement; + private long maxRetainedSize; + private long maxReturnSize; protected final List<Type> typeConvertors; @@ -89,7 +91,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { List<TSDataType> inputColumnTypes, Map<String, InputLocation> sourceColumnToInputLocationMap, ExecutorService intoOperationExecutor, - long maxStatementSize) { + long statementSizePerLine) { this.operatorContext = operatorContext; this.child = child; this.typeConvertors = @@ -97,7 +99,23 @@ public abstract class AbstractIntoOperator implements ProcessOperator { this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; this.writeOperationExecutor = intoOperationExecutor; + initMemoryEstimates(statementSizePerLine); + } + + private void initMemoryEstimates(long statementSizePerLine) { + long intoOperationBufferSizeInByte = + IoTDBDescriptor.getInstance().getConfig().getIntoOperationBufferSizeInByte(); + long memAllowedMaxRowNumber = Math.max(intoOperationBufferSizeInByte / statementSizePerLine, 1); + if (memAllowedMaxRowNumber > Integer.MAX_VALUE) { + memAllowedMaxRowNumber = Integer.MAX_VALUE; + } + int maxRowNumberInStatement = + Math.min( + (int) memAllowedMaxRowNumber, + IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit()); + long maxStatementSize = maxRowNumberInStatement * statementSizePerLine; + this.maxRowNumberInStatement = maxRowNumberInStatement; this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize; this.maxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; } @@ -209,7 +227,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator { Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, Map<String, Boolean> targetDeviceToAlignedMap, - List<Type> sourceTypeConvertors) { + List<Type> sourceTypeConvertors, + int maxRowNumberInStatement) { List<InsertTabletStatementGenerator> insertTabletStatementGenerators = new ArrayList<>(targetPathToSourceInputLocationMap.size()); for (Map.Entry<PartialPath, Map<String, InputLocation>> entry : @@ -221,7 +240,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator { entry.getValue(), targetPathToDataTypeMap.get(targetDevice), targetDeviceToAlignedMap.get(targetDevice.toString()), - sourceTypeConvertors); + sourceTypeConvertors, + maxRowNumberInStatement); insertTabletStatementGenerators.add(generator); } return insertTabletStatementGenerators; @@ -322,10 +342,14 @@ public abstract class AbstractIntoOperator implements ProcessOperator { return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext(); } + @TestOnly + public int getMaxRowNumberInStatement() { + return maxRowNumberInStatement; + } + public static class InsertTabletStatementGenerator { - private final int TABLET_ROW_LIMIT = - IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); + private final int rowLimit; private final PartialPath devicePath; private final boolean isAligned; @@ -348,7 +372,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator { Map<String, InputLocation> measurementToInputLocationMap, Map<String, TSDataType> measurementToDataTypeMap, Boolean isAligned, - List<Type> sourceTypeConvertors) { + List<Type> sourceTypeConvertors, + int rowLimit) { this.devicePath = devicePath; this.isAligned = isAligned; this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]); @@ -359,32 +384,33 @@ public abstract class AbstractIntoOperator implements ProcessOperator { writtenCounter.put(measurement, new AtomicInteger(0)); } this.sourceTypeConvertors = sourceTypeConvertors; + this.rowLimit = rowLimit; this.reset(); } public void reset() { this.rowCount = 0; - this.times = new long[TABLET_ROW_LIMIT]; + this.times = new long[rowLimit]; this.columns = new Object[this.measurements.length]; for (int i = 0; i < this.measurements.length; i++) { switch (dataTypes[i]) { case BOOLEAN: - columns[i] = new boolean[TABLET_ROW_LIMIT]; + columns[i] = new boolean[rowLimit]; break; case INT32: - columns[i] = new int[TABLET_ROW_LIMIT]; + columns[i] = new int[rowLimit]; break; case INT64: - columns[i] = new long[TABLET_ROW_LIMIT]; + columns[i] = new long[rowLimit]; break; case FLOAT: - columns[i] = new float[TABLET_ROW_LIMIT]; + columns[i] = new float[rowLimit]; break; case DOUBLE: - columns[i] = new double[TABLET_ROW_LIMIT]; + columns[i] = new double[rowLimit]; break; case TEXT: - columns[i] = new Binary[TABLET_ROW_LIMIT]; + columns[i] = new Binary[rowLimit]; Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE); break; default: @@ -394,7 +420,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { } this.bitMaps = new BitMap[this.measurements.length]; for (int i = 0; i < this.bitMaps.length; ++i) { - this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT); + this.bitMaps[i] = new BitMap(rowLimit); this.bitMaps[i].markAll(); } } @@ -452,7 +478,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { ++rowCount; ++lastReadIndex; - if (rowCount == TABLET_ROW_LIMIT) { + if (rowCount == rowLimit) { break; } } @@ -460,7 +486,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { } public boolean isFull() { - return rowCount == TABLET_ROW_LIMIT; + return rowCount == rowLimit; } public boolean isEmpty() { @@ -475,7 +501,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { insertTabletStatement.setDataTypes(dataTypes); insertTabletStatement.setRowCount(rowCount); - if (rowCount != TABLET_ROW_LIMIT) { + if (rowCount != rowLimit) { times = Arrays.copyOf(times, rowCount); for (int i = 0; i < columns.length; i++) { bitMaps[i] = bitMaps[i].getRegion(0, rowCount); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index 0ddd7ccc31..5f2d202219 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -64,14 +64,14 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap, Map<String, InputLocation> sourceColumnToInputLocationMap, ExecutorService intoOperationExecutor, - long maxStatementSize) { + long statementSizePerLine) { super( operatorContext, child, inputColumnTypes, sourceColumnToInputLocationMap, intoOperationExecutor, - maxStatementSize); + statementSizePerLine); this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap; this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap; this.targetDeviceToAlignedMap = targetDeviceToAlignedMap; @@ -148,7 +148,8 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap, - typeConvertors); + typeConvertors, + maxRowNumberInStatement); } private void updateResultTsBlock() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 34a8edc88a..d5e65d262e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -52,21 +52,22 @@ public class IntoOperator extends AbstractIntoOperator { List<Pair<String, PartialPath>> sourceTargetPathPairList, Map<String, InputLocation> sourceColumnToInputLocationMap, ExecutorService intoOperationExecutor, - long maxStatementSize) { + long statementSizePerLine) { super( operatorContext, child, inputColumnTypes, sourceColumnToInputLocationMap, intoOperationExecutor, - maxStatementSize); + statementSizePerLine); this.sourceTargetPathPairList = sourceTargetPathPairList; insertTabletStatementGenerators = constructInsertTabletStatementGenerators( targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap, - typeConvertors); + typeConvertors, + maxRowNumberInStatement); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 110a7c2cfc..2742b4bce2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory; @@ -1622,10 +1621,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = intoPathDescriptor.getTargetPathToDataTypeMap(); - - int rowLimit = - IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); - long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit; + long statementSizePerLine = calculateStatementSizePerLine(targetPathToDataTypeMap); context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); @@ -1639,7 +1635,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP intoPathDescriptor.getSourceTargetPathPairList(), sourceColumnToInputLocationMap, FragmentInstanceManager.getInstance().getIntoOperationExecutor(), - maxStatementSize); + statementSizePerLine); } @Override @@ -1680,10 +1676,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP calculateStatementSizePerLine(deviceToTargetPathDataTypeMap.get(sourceDevice)); } - int rowLimit = - IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); - long maxStatementSize = statementSizePerLine * rowLimit; - context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new DeviceViewIntoOperator( operatorContext, @@ -1695,7 +1687,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(), sourceColumnToInputLocationMap, FragmentInstanceManager.getInstance().getIntoOperationExecutor(), - maxStatementSize); + statementSizePerLine); } private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java index d6343d3392..ec3fa4a9b9 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; @@ -1422,4 +1423,80 @@ public class OperatorMemoryTest { expectedMaxRetainSize + expectedChildrenRetainedSize, aggregationOperator.calculateRetainedSizeAfterCallingNext()); } + + @Test + public void intoOperatorTest() { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()) + .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); + Mockito.when(child.calculateMaxReturnSize()) + .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + + long statementSizePerLine1 = 8 + 1000 * (4 + 8 + 4 + 8 + 1 + 512); + IntoOperator intoOperator1 = createIntoOperator(child, statementSizePerLine1); + int expectedMaxRowNumber = 195; + long expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine1; + assertEquals(expectedMaxRowNumber, intoOperator1.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator1.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator1.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator1.calculateRetainedSizeAfterCallingNext()); + + long statementSizePerLine2 = 8 + 1000 * (4 + 8 + 4 + 8 + 1); + IntoOperator intoOperator2 = createIntoOperator(child, statementSizePerLine2); + expectedMaxRowNumber = 4192; + expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine2; + assertEquals(expectedMaxRowNumber, intoOperator2.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator2.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator2.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator2.calculateRetainedSizeAfterCallingNext()); + + long statementSizePerLine3 = 8 + 100 * (4 + 8 + 4 + 8 + 1); + IntoOperator intoOperator3 = createIntoOperator(child, statementSizePerLine3); + expectedMaxRowNumber = 10000; + expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine3; + assertEquals(expectedMaxRowNumber, intoOperator3.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator3.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator3.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator3.calculateRetainedSizeAfterCallingNext()); + + long statementSizePerLine4 = 8 + 1000000 * (4 + 8 + 4 + 8 + 1 + 512); + IntoOperator intoOperator4 = createIntoOperator(child, statementSizePerLine4); + expectedMaxRowNumber = 1; + expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine4; + assertEquals(expectedMaxRowNumber, intoOperator4.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator4.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator4.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator4.calculateRetainedSizeAfterCallingNext()); + } + + private IntoOperator createIntoOperator(Operator child, long statementSizePerLine) { + return new IntoOperator( + Mockito.mock(OperatorContext.class), + child, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyMap(), + null, + statementSizePerLine); + } }
