This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/FixIntoMem in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e2901f458d19280f8f3400c49f86655636a2e909 Author: Minghui Liu <[email protected]> AuthorDate: Fri Apr 7 16:55:38 2023 +0800 add UTs --- .../operator/process/AbstractIntoOperator.java | 8 ++- .../mpp/execution/operator/OperatorMemoryTest.java | 77 ++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 7a439daa3c..8816776d88 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.client.DataNodeInternalClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.IntoProcessException; @@ -104,7 +105,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator { private void initMemoryEstimates(long statementSizePerLine) { long intoOperationBufferSizeInByte = IoTDBDescriptor.getInstance().getConfig().getIntoOperationBufferSizeInByte(); - long memAllowedMaxRowNumber = intoOperationBufferSizeInByte / statementSizePerLine; + long memAllowedMaxRowNumber = Math.max(intoOperationBufferSizeInByte / statementSizePerLine, 1); if (memAllowedMaxRowNumber > Integer.MAX_VALUE) { memAllowedMaxRowNumber = Integer.MAX_VALUE; } @@ -341,6 +342,11 @@ public abstract class AbstractIntoOperator implements ProcessOperator { return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext(); } + @TestOnly + public int getMaxRowNumberInStatement() { + return maxRowNumberInStatement; + } + public static class InsertTabletStatementGenerator { private final int rowLimit; diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java index d6343d3392..ec3fa4a9b9 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; @@ -1422,4 +1423,80 @@ public class OperatorMemoryTest { expectedMaxRetainSize + expectedChildrenRetainedSize, aggregationOperator.calculateRetainedSizeAfterCallingNext()); } + + @Test + public void intoOperatorTest() { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()) + .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); + Mockito.when(child.calculateMaxReturnSize()) + .thenReturn((long) DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + + long statementSizePerLine1 = 8 + 1000 * (4 + 8 + 4 + 8 + 1 + 512); + IntoOperator intoOperator1 = createIntoOperator(child, statementSizePerLine1); + int expectedMaxRowNumber = 195; + long expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine1; + assertEquals(expectedMaxRowNumber, intoOperator1.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator1.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator1.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator1.calculateRetainedSizeAfterCallingNext()); + + long statementSizePerLine2 = 8 + 1000 * (4 + 8 + 4 + 8 + 1); + IntoOperator intoOperator2 = createIntoOperator(child, statementSizePerLine2); + expectedMaxRowNumber = 4192; + expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine2; + assertEquals(expectedMaxRowNumber, intoOperator2.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator2.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator2.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator2.calculateRetainedSizeAfterCallingNext()); + + long statementSizePerLine3 = 8 + 100 * (4 + 8 + 4 + 8 + 1); + IntoOperator intoOperator3 = createIntoOperator(child, statementSizePerLine3); + expectedMaxRowNumber = 10000; + expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine3; + assertEquals(expectedMaxRowNumber, intoOperator3.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator3.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator3.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator3.calculateRetainedSizeAfterCallingNext()); + + long statementSizePerLine4 = 8 + 1000000 * (4 + 8 + 4 + 8 + 1 + 512); + IntoOperator intoOperator4 = createIntoOperator(child, statementSizePerLine4); + expectedMaxRowNumber = 1; + expectedMaxStatementSize = expectedMaxRowNumber * statementSizePerLine4; + assertEquals(expectedMaxRowNumber, intoOperator4.getMaxRowNumberInStatement()); + assertEquals( + expectedMaxStatementSize + 3L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator4.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, intoOperator4.calculateMaxReturnSize()); + assertEquals( + expectedMaxStatementSize + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + intoOperator4.calculateRetainedSizeAfterCallingNext()); + } + + private IntoOperator createIntoOperator(Operator child, long statementSizePerLine) { + return new IntoOperator( + Mockito.mock(OperatorContext.class), + child, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyMap(), + null, + statementSizePerLine); + } }
