This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch MemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 11b7a5f041ea3eafc19d21369b7dc9aac88b8212 Author: zyk990424 <[email protected]> AuthorDate: Fri Aug 12 11:34:57 2022 +0800 add retained size calculation --- .../operator/schema/CountMergeOperator.java | 9 +++++++++ .../operator/schema/DevicesCountOperator.java | 5 +++++ .../schema/LevelTimeSeriesCountOperator.java | 5 +++++ .../schema/NodeManageMemoryMergeOperator.java | 7 ++++++- .../operator/schema/NodePathsConvertOperator.java | 5 +++++ .../operator/schema/NodePathsCountOperator.java | 7 ++++++- .../operator/schema/NodePathsSchemaScanOperator.java | 5 +++++ .../operator/schema/SchemaFetchMergeOperator.java | 9 +++++++++ .../operator/schema/SchemaFetchScanOperator.java | 20 +++++++++++--------- .../operator/schema/SchemaQueryMergeOperator.java | 9 +++++++++ .../schema/SchemaQueryOrderByHeatOperator.java | 14 ++++++++++++++ .../operator/schema/SchemaQueryScanOperator.java | 9 ++++++++- .../operator/schema/TimeSeriesCountOperator.java | 5 +++++ 13 files changed, 97 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java index f10c28a813..7524b7c455 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/CountMergeOperator.java @@ -153,4 +153,13 @@ public class CountMergeOperator implements ProcessOperator { return childrenMaxReturnSize; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + long retainedSize = 0L; + for (Operator child : children) { + retainedSize += child.calculateRetainedSizeAfterCallingNext(); + } + return retainedSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java index 87748916e4..7a22a1573e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/DevicesCountOperator.java @@ -99,4 +99,9 @@ public class DevicesCountOperator implements SourceOperator { // the integer used for count return 4L; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0L; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java index ff11550d5b..4cc64cff07 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/LevelTimeSeriesCountOperator.java @@ -127,4 +127,9 @@ public class LevelTimeSeriesCountOperator implements SourceOperator { public long calculateMaxReturnSize() { return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0L; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java index 4363735bb8..b7688b023a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java @@ -136,11 +136,16 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator { public long calculateMaxPeekMemory() { // todo calculate the result based on all the scan node; currently, this is shadowed by // schemaQueryMergeNode - return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; } @Override public long calculateMaxReturnSize() { return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize()); } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java index 9be2d3aa72..f240186cdc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsConvertOperator.java @@ -107,4 +107,9 @@ public class NodePathsConvertOperator implements ProcessOperator { public long calculateMaxReturnSize() { return child.calculateMaxReturnSize(); } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return child.calculateRetainedSizeAfterCallingNext(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java index 9e78ef28ed..37b05c4e58 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java @@ -104,11 +104,16 @@ public class NodePathsCountOperator implements ProcessOperator { public long calculateMaxPeekMemory() { // todo calculate the result based on all the scan node; currently, this is shadowed by // schemaQueryMergeNode - return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; } @Override public long calculateMaxReturnSize() { return Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize()); } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java index 54a2d02755..598ed0c953 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsSchemaScanOperator.java @@ -132,4 +132,9 @@ public class NodePathsSchemaScanOperator implements SourceOperator { public long calculateMaxReturnSize() { return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0L; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java index 74c89789e9..96ff8cbc11 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchMergeOperator.java @@ -142,4 +142,13 @@ public class SchemaFetchMergeOperator implements ProcessOperator { return childrenMaxReturnSize; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + long retainedSize = 0L; + for (Operator child : children) { + retainedSize += child.calculateRetainedSizeAfterCallingNext(); + } + return retainedSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java index da10e01a2e..4e07c4178c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaFetchScanOperator.java @@ -57,7 +57,6 @@ public class SchemaFetchScanOperator implements SourceOperator { private final ISchemaRegion schemaRegion; - private TsBlock tsBlock; private boolean isFinished = false; public SchemaFetchScanOperator( @@ -85,12 +84,11 @@ public class SchemaFetchScanOperator implements SourceOperator { } isFinished = true; try { - fetchSchema(); + return fetchSchema(); } catch (MetadataException e) { logger.error("Error occurred during execute SchemaFetchOperator {}", sourceId, e); throw new RuntimeException(e); } - return tsBlock; } @Override @@ -108,7 +106,7 @@ public class SchemaFetchScanOperator implements SourceOperator { return sourceId; } - private void fetchSchema() throws MetadataException { + private TsBlock fetchSchema() throws MetadataException { ClusterSchemaTree schemaTree = new ClusterSchemaTree(); List<PartialPath> partialPathList = patternTree.getAllPathPatterns(); for (PartialPath path : partialPathList) { @@ -124,11 +122,10 @@ public class SchemaFetchScanOperator implements SourceOperator { } catch (IOException e) { // Totally memory operation. This case won't happen. } - this.tsBlock = - new TsBlock( - new TimeColumn(1, new long[] {0}), - new BinaryColumn( - 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); + return new TsBlock( + new TimeColumn(1, new long[] {0}), + new BinaryColumn( + 1, Optional.empty(), new Binary[] {new Binary(outputStream.toByteArray())})); } @Override @@ -140,4 +137,9 @@ public class SchemaFetchScanOperator implements SourceOperator { public long calculateMaxReturnSize() { return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0L; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java index aedf1ab0c6..f2671034e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryMergeOperator.java @@ -100,4 +100,13 @@ public class SchemaQueryMergeOperator implements ProcessOperator { return childrenMaxReturnSize; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + long retainedSize = 0L; + for (Operator child : children) { + retainedSize += child.calculateRetainedSizeAfterCallingNext(); + } + return retainedSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java index 155bcdb640..778ea8270d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryOrderByHeatOperator.java @@ -203,4 +203,18 @@ public class SchemaQueryOrderByHeatOperator implements ProcessOperator { return maxReturnSize; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + long retainedSize = 0L; + + for (Operator child : operators) { + retainedSize += child.calculateMaxReturnSize(); + } + + for (Operator child : operators) { + retainedSize += child.calculateRetainedSizeAfterCallingNext(); + } + return retainedSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java index bcca707c3e..e9ea46ab5f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/SchemaQueryScanOperator.java @@ -88,7 +88,9 @@ public abstract class SchemaQueryScanOperator implements SourceOperator { @Override public TsBlock next() { hasCachedTsBlock = false; - return tsBlock; + TsBlock result = tsBlock; + tsBlock = null; + return result; } @Override @@ -121,4 +123,9 @@ public abstract class SchemaQueryScanOperator implements SourceOperator { public long calculateMaxReturnSize() { return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0L; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java index 377071ceda..165cfefa71 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/TimeSeriesCountOperator.java @@ -115,4 +115,9 @@ public class TimeSeriesCountOperator implements SourceOperator { // the integer used for count return 4L; } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0L; + } }
