This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/AggOpMemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit acd9559516bf27aa2b9284d2cf2ef9c421c7cb27 Merge: e64b5c5679 bb9d2ff8c7 Author: Minghui Liu <[email protected]> AuthorDate: Fri Aug 12 09:05:08 2022 +0800 merge master .../resources/conf/iotdb-confignode.properties | 4 +- .../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +- .../iotdb/confignode/manager/ConfigManager.java | 22 +- .../iotdb/confignode/manager/PartitionManager.java | 33 ++- .../iotdb/confignode/manager/load/LoadManager.java | 28 +- .../manager/load/balancer/RouteBalancer.java | 4 + .../partition/GreedyPartitionAllocator.java | 25 +- .../load/balancer/router/LazyGreedyRouter.java | 4 + .../persistence/partition/PartitionInfo.java | 47 ++-- .../partition/StorageGroupPartitionTable.java | 40 ++- .../thrift/ConfigNodeRPCServiceProcessor.java | 3 +- .../thrift/ConfigNodeRPCServiceProcessorTest.java | 284 +------------------ .../Maintenance-Tools/Maintenance-Command.md | 192 ++++++------- .../Maintenance-Tools/Maintenance-Command.md | 191 +++++++------ .../java/org/apache/iotdb/it/env/MppConfig.java | 7 + .../org/apache/iotdb/itbase/env/BaseConfig.java | 8 + .../db/it/IoTDBClusterPartitionTableTest.java | 308 +++++++++++++++++++++ .../integration/IoTDBManageTsFileResourceIT.java | 7 +- .../iotdb/commons/cluster/RegionRoleType.java | 37 +++ .../commons/partition/DataPartitionTable.java | 21 ++ .../commons/partition/SeriesPartitionTable.java | 22 ++ .../resources/conf/iotdb-datanode.properties | 14 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 40 ++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 39 ++- .../iotdb/db/mpp/common/header/HeaderConstant.java | 4 +- .../operator/process/DeviceMergeOperator.java | 34 +++ .../operator/process/DeviceViewOperator.java | 28 ++ .../operator/process/FilterAndProjectOperator.java | 108 ++++++++ .../execution/config/metadata/ShowRegionTask.java | 1 + .../db/mpp/plan/planner/LocalExecutionPlanner.java | 6 + .../column/multi/MappableUDFColumnTransformer.java | 4 + .../column/ternary/TernaryColumnTransformer.java | 12 + .../dag/column/unary/UnaryColumnTransformer.java | 4 + .../iotdb/db/rescon/TsFileResourceManager.java | 3 +- .../mpp/execution/operator/OperatorMemoryTest.java | 152 ++++++++++ .../iotdb/db/rescon/ResourceManagerTest.java | 10 +- .../src/main/thrift/confignode.thrift | 1 + 37 files changed, 1177 insertions(+), 572 deletions(-) diff --cc server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java index 67be7fd353,c718942108..8f2d891c95 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java @@@ -46,12 -48,15 +50,18 @@@ import org.apache.iotdb.db.mpp.executio import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; + import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer; + import org.apache.iotdb.db.mpp.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer; + import org.apache.iotdb.db.mpp.transformation.dag.column.binary.CompareLessEqualColumnTransformer; + import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.ConstantColumnTransformer; + import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.TimeColumnTransformer; + import org.apache.iotdb.db.utils.datastructure.TimeSelector; +import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@@ -473,63 -482,141 +487,201 @@@ public class OperatorMemoryTest assertEquals(512, linearFillOperator.calculateRetainedSizeAfterCallingNext()); } + @Test + public void deviceMergeOperatorTest() { + List<Operator> children = new ArrayList<>(4); + List<TSDataType> dataTypeList = new ArrayList<>(2); + dataTypeList.add(TSDataType.INT32); + dataTypeList.add(TSDataType.INT32); + List<String> devices = new ArrayList<>(4); + devices.add("device1"); + devices.add("device2"); + devices.add("device3"); + devices.add("device4"); + long expectedMaxReturnSize = + 3L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + long expectedMaxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + long expectedRetainedSizeAfterCallingNext = 0; + long childrenMaxPeekMemory = 0; + + for (int i = 0; i < 4; i++) { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(64 * 1024L); + expectedMaxPeekMemory += 128 * 1024L; + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + expectedRetainedSizeAfterCallingNext += 128 * 1024L; + children.add(child); + } + + expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory); + + DeviceMergeOperator deviceMergeOperator = + new DeviceMergeOperator( + Mockito.mock(OperatorContext.class), + devices, + children, + dataTypeList, + Mockito.mock(TimeSelector.class), + Mockito.mock(TimeComparator.class)); + + assertEquals(expectedMaxPeekMemory, deviceMergeOperator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, deviceMergeOperator.calculateMaxReturnSize()); + assertEquals( + expectedRetainedSizeAfterCallingNext - 64 * 1024L, + deviceMergeOperator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void deviceViewOperatorTest() { + List<Operator> children = new ArrayList<>(4); + List<TSDataType> dataTypeList = new ArrayList<>(2); + dataTypeList.add(TSDataType.INT32); + dataTypeList.add(TSDataType.INT32); + List<String> devices = new ArrayList<>(4); + devices.add("device1"); + devices.add("device2"); + devices.add("device3"); + devices.add("device4"); + long expectedMaxReturnSize = + 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + long expectedMaxPeekMemory = expectedMaxReturnSize; + long expectedRetainedSizeAfterCallingNext = 0; + long childrenMaxPeekMemory = 0; + + for (int i = 0; i < 4; i++) { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(1024L); + expectedMaxPeekMemory += 1024L; + childrenMaxPeekMemory = Math.max(childrenMaxPeekMemory, child.calculateMaxPeekMemory()); + expectedRetainedSizeAfterCallingNext += 1024L; + children.add(child); + } + + expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, childrenMaxPeekMemory); + + DeviceViewOperator deviceViewOperator = + new DeviceViewOperator( + Mockito.mock(OperatorContext.class), + devices, + children, + new ArrayList<>(), + dataTypeList); + + assertEquals(expectedMaxPeekMemory, deviceViewOperator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, deviceViewOperator.calculateMaxReturnSize()); + assertEquals( + expectedRetainedSizeAfterCallingNext, + deviceViewOperator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void filterAndProjectOperatorTest() { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(2048L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(512L); + BooleanType booleanType = Mockito.mock(BooleanType.class); + Mockito.when(booleanType.getTypeEnum()).thenReturn(TypeEnum.BOOLEAN); + LongType longType = Mockito.mock(LongType.class); + Mockito.when(longType.getTypeEnum()).thenReturn(TypeEnum.INT64); + ColumnTransformer filterColumnTransformer = + new CompareLessEqualColumnTransformer( + booleanType, + new TimeColumnTransformer(longType), + new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class))); + List<TSDataType> filterOutputTypes = new ArrayList<>(); + filterOutputTypes.add(TSDataType.INT32); + filterOutputTypes.add(TSDataType.INT64); + List<ColumnTransformer> projectColumnTransformers = new ArrayList<>(); + projectColumnTransformers.add( + new ArithmeticAdditionColumnTransformer( + booleanType, + new TimeColumnTransformer(longType), + new ConstantColumnTransformer(longType, Mockito.mock(IntColumn.class)))); + + FilterAndProjectOperator operator = + new FilterAndProjectOperator( + Mockito.mock(OperatorContext.class), + child, + filterOutputTypes, + new ArrayList<>(), + filterColumnTransformer, + new ArrayList<>(), + new ArrayList<>(), + projectColumnTransformers, + false, + true); + + assertEquals( + 4L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + 512L, + operator.calculateMaxPeekMemory()); + assertEquals( + 2 * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(), + operator.calculateMaxReturnSize()); + assertEquals(512, operator.calculateRetainedSizeAfterCallingNext()); + } ++ + @Test + public void seriesAggregationScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + MeasurementPath measurementPath = + new MeasurementPath( + "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32); + Set<String> allSensors = Sets.newHashSet("sensor0"); + + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName()); + + SeriesAggregationScanOperator seriesAggregationScanOperator = + new SeriesAggregationScanOperator( + planNodeId, + measurementPath, + allSensors, + fragmentInstanceContext.getOperatorContexts().get(0), + Arrays.asList( + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.COUNT, TSDataType.INT32, true), + AggregationStep.SINGLE), + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.MAX_VALUE, TSDataType.INT32, true), + AggregationStep.SINGLE), + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.MIN_TIME, TSDataType.INT32, true), + AggregationStep.SINGLE)), + null, + true, + null); + + assertEquals( + 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + seriesAggregationScanOperator.calculateMaxPeekMemory()); + assertEquals( + 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + seriesAggregationScanOperator.calculateMaxReturnSize()); + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } }
