This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/add_more_metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/add_more_metrics by
this push:
new 96e96a4001c add more query metrics
96e96a4001c is described below
commit 96e96a4001c059cca10698cc39a8eec25ae6b4fb
Author: Beyyes <[email protected]>
AuthorDate: Thu Mar 14 18:08:40 2024 +0800
add more query metrics
---
.../fragment/FragmentInstanceContext.java | 3 +++
.../execution/fragment/QueryStatistics.java | 2 ++
.../execution/operator/source/SeriesScanUtil.java | 12 ++++++++++
.../metric/SeriesScanCostMetricSet.java | 26 +++++++++++++++++++++
.../read/reader/chunk/MemChunkReader.java | 6 +++++
.../dataregion/read/reader/common/Element.java | 10 +++++---
.../read/reader/common/PriorityMergeReader.java | 27 ++++++++++++++++++----
.../read/reader/common/FakedSeriesReader.java | 6 +++++
.../apache/iotdb/tsfile/read/common/BatchData.java | 6 +++++
.../iotdb/tsfile/read/common/block/TsBlock.java | 14 +++++++++++
.../iotdb/tsfile/read/reader/IPointReader.java | 2 ++
.../page/LazyLoadAlignedPagePointReader.java | 14 ++++++++---
12 files changed, 117 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 8c74170cef3..ab0f7d7417f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -543,6 +543,9 @@ public class FragmentInstanceContext extends QueryContext {
getQueryStatistics().pageReadersDecodeAlignedDiskTime.get(),
getQueryStatistics().pageReadersDecodeNonAlignedMemTime.get(),
getQueryStatistics().pageReadersDecodeNonAlignedDiskTime.get());
+
+ SeriesScanCostMetricSet.getInstance()
+
.updatePageReaderMemoryUsage(getQueryStatistics().pageReaderMaxUsedMemorySize.get());
}
private void releaseDataNodeQueryContext() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
index 60cd84b111e..82094467034 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
@@ -69,6 +69,8 @@ public class QueryStatistics {
public AtomicLong pageReadersDecodeNonAlignedMemCount = new AtomicLong(0);
public AtomicLong pageReadersDecodeNonAlignedMemTime = new AtomicLong(0);
+ public AtomicLong pageReaderMaxUsedMemorySize = new AtomicLong(0);
+
public TQueryStatistics toThrift() {
return new TQueryStatistics(
loadTimeSeriesMetadataDiskSeqCount.get(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index cd1dc52e575..0917ccfb86b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -791,6 +791,10 @@ public class SeriesScanUtil {
firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
firstPageReader.version,
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
+ context
+ .getQueryStatistics()
+ .pageReaderMaxUsedMemorySize
+ .updateAndGet(v -> Math.max(v,
mergeReader.getUsedMemorySize()));
currentPageEndPointTime =
updateEndPointTime(currentPageEndPointTime,
firstPageReader);
firstPageReader = null;
@@ -815,6 +819,10 @@ public class SeriesScanUtil {
getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
pageReader.version,
orderUtils.getOverlapCheckTime(pageReader.getStatistics()));
+ context
+ .getQueryStatistics()
+ .pageReaderMaxUsedMemorySize
+ .updateAndGet(v -> Math.max(v,
mergeReader.getUsedMemorySize()));
currentPageEndPointTime =
updateEndPointTime(currentPageEndPointTime, pageReader);
}
}
@@ -987,6 +995,10 @@ public class SeriesScanUtil {
getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
pageReader.version,
orderUtils.getOverlapCheckTime(pageReader.getStatistics()));
+ context
+ .getQueryStatistics()
+ .pageReaderMaxUsedMemorySize
+ .updateAndGet(v -> Math.max(v, mergeReader.getUsedMemorySize()));
}
private TsBlock nextOverlappedPage() throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
index 44df56d7961..32c161af845 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
@@ -970,6 +970,7 @@ public class SeriesScanCostMetricSet implements IMetricSet {
private static final String BUILD_TSBLOCK_FROM_PAGE_READER =
"build_tsblock_from_page_reader";
private static final String HISTOGRAM_BUILD_TSBLOCK_FROM_PAGE_READER =
"histogram_build_tsblock_from_page_reader";
+ private static final String PAGE_READER_MAX_USED_MEMORY_SIZE =
"page_reader_max_used_memory_size";
private Histogram pageReadersDecodeAlignedMemHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
@@ -980,6 +981,8 @@ public class SeriesScanCostMetricSet implements IMetricSet {
private Histogram pageReadersDecodeNonAlignedDiskHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ private Histogram pageReaderMaxMemoryHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
private Timer pageReadersDecodeAlignedMemTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer pageReadersDecodeAlignedDiskTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer pageReadersDecodeNonAlignedMemTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
@@ -1004,6 +1007,10 @@ public class SeriesScanCostMetricSet implements
IMetricSet {
pageReadersDecodeNonAlignedDiskTimer.updateNanos(nonAlignedDiskTime);
}
+ public void updatePageReaderMemoryUsage(long memorySize) {
+ pageReaderMaxMemoryHistogram.update(memorySize);
+ }
+
private void bindTsBlockFromPageReader(AbstractMetricService metricService) {
pageReadersDecodeAlignedMemHistogram =
metricService.getOrCreateHistogram(
@@ -1086,6 +1093,15 @@ public class SeriesScanCostMetricSet implements
IMetricSet {
NON_ALIGNED,
Tag.FROM.toString(),
DISK);
+
+ pageReaderMaxMemoryHistogram =
+ metricService.getOrCreateHistogram(
+ Metric.SERIES_SCAN_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.STAGE.toString(),
+ HISTOGRAM_BUILD_TSBLOCK_FROM_PAGE_READER,
+ Tag.TYPE.toString(),
+ PAGE_READER_MAX_USED_MEMORY_SIZE);
}
private void unbindTsBlockFromPageReader(AbstractMetricService
metricService) {
@@ -1093,6 +1109,8 @@ public class SeriesScanCostMetricSet implements
IMetricSet {
pageReadersDecodeAlignedDiskHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
pageReadersDecodeNonAlignedMemHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
pageReadersDecodeNonAlignedDiskHistogram =
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+ pageReaderMaxMemoryHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
pageReadersDecodeAlignedMemTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
pageReadersDecodeAlignedDiskTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
pageReadersDecodeNonAlignedMemTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
@@ -1121,6 +1139,14 @@ public class SeriesScanCostMetricSet implements
IMetricSet {
from);
}
}
+
+ metricService.remove(
+ MetricType.HISTOGRAM,
+ Metric.SERIES_SCAN_COST.toString(),
+ Tag.STAGE.toString(),
+ HISTOGRAM_BUILD_TSBLOCK_FROM_PAGE_READER,
+ Tag.TYPE.toString(),
+ PAGE_READER_MAX_USED_MEMORY_SIZE);
}
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
index 90719ade794..4a9844ad2cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
@@ -98,6 +98,12 @@ public class MemChunkReader implements IChunkReader,
IPointReader {
return pageReaderList.remove(0).getAllSatisfiedPageData();
}
+ @Override
+ public long getUsedMemorySize() {
+ // not used
+ return timeValuePairIterator.getUsedMemorySize();
+ }
+
@Override
public void close() {
// Do nothing because mem chunk reader will not open files
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
index 2f73c00a119..746701b8753 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
@@ -26,9 +26,9 @@ import java.io.IOException;
public class Element {
- public PriorityMergeReader.MergeReaderPriority priority;
- protected IPointReader reader;
- public TimeValuePair timeValuePair;
+ private final PriorityMergeReader.MergeReaderPriority priority;
+ private final IPointReader reader;
+ private TimeValuePair timeValuePair;
public Element(
IPointReader reader,
@@ -67,6 +67,10 @@ public class Element {
return timeValuePair;
}
+ public void setTimeValuePair(TimeValuePair timeValuePair) {
+ this.timeValuePair = timeValuePair;
+ }
+
public PriorityMergeReader.MergeReaderPriority getPriority() {
return priority;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index de0281e7b6d..330c856ded0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -36,16 +37,20 @@ public class PriorityMergeReader implements IPointReader {
protected PriorityQueue<Element> heap;
+ protected long usedMemorySize = 0;
+
public PriorityMergeReader() {
heap =
new PriorityQueue<>(
(o1, o2) -> {
int timeCompare =
- Long.compare(o1.timeValuePair.getTimestamp(),
o2.timeValuePair.getTimestamp());
- return timeCompare != 0 ? timeCompare :
o2.priority.compareTo(o1.priority);
+ Long.compare(
+ o1.getTimeValuePair().getTimestamp(),
o2.getTimeValuePair().getTimestamp());
+ return timeCompare != 0 ? timeCompare :
o2.getPriority().compareTo(o1.getPriority());
});
}
+ @TestOnly
public void addReader(IPointReader reader, long priority) throws IOException
{
if (reader.hasNextTimeValuePair()) {
heap.add(
@@ -58,8 +63,10 @@ public class PriorityMergeReader implements IPointReader {
public void addReader(IPointReader reader, MergeReaderPriority priority,
long endTime)
throws IOException {
if (reader.hasNextTimeValuePair()) {
- heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+ Element element = new Element(reader, reader.nextTimeValuePair(),
priority);
+ heap.add(element);
currentReadStopTime = Math.max(currentReadStopTime, endTime);
+ usedMemorySize += element.getReader().getUsedMemorySize();
} else {
reader.close();
}
@@ -85,8 +92,10 @@ public class PriorityMergeReader implements IPointReader {
}
updateHeap(ret, topNext);
if (topNext != null) {
- top.timeValuePair = topNext;
+ top.setTimeValuePair(topNext);
heap.add(top);
+ } else {
+ usedMemorySize -= top.getReader().getUsedMemorySize();
}
return ret;
}
@@ -111,7 +120,8 @@ public class PriorityMergeReader implements IPointReader {
Element e = heap.poll();
fillNullValue(ret, e.getTimeValuePair());
if (!e.hasNext()) {
- e.reader.close();
+ usedMemorySize -= e.getReader().getUsedMemorySize();
+ e.getReader().close();
continue;
}
e.next();
@@ -122,6 +132,7 @@ public class PriorityMergeReader implements IPointReader {
e.next();
heap.add(e);
} else {
+ usedMemorySize -= e.getReader().getUsedMemorySize();
// the chunk is end
e.close();
}
@@ -136,12 +147,18 @@ public class PriorityMergeReader implements IPointReader {
// do nothing for non-aligned time series
}
+ @Override
+ public long getUsedMemorySize() {
+ return usedMemorySize;
+ }
+
@Override
public void close() throws IOException {
while (!heap.isEmpty()) {
Element e = heap.poll();
e.close();
}
+ usedMemorySize = 0;
}
public static class MergeReaderPriority implements
Comparable<MergeReaderPriority> {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
index 86cae386bb5..ac5d2ceaf4e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
@@ -80,6 +80,12 @@ public class FakedSeriesReader implements IPointReader {
throw new IOException("current() in FakedPrioritySeriesReader is an empty
method.");
}
+ @Override
+ public long getUsedMemorySize() {
+ // not used
+ return 0;
+ }
+
@Override
public void close() {}
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 4d57583d80a..e79da513ac8 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -855,6 +855,12 @@ public class BatchData {
return new TimeValuePair(currentTime(), currentTsPrimitiveType());
}
+ @Override
+ public long getUsedMemorySize() {
+ // not used
+ return 0;
+ }
+
@Override
public void close() {
// do nothing
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 668b396c7bd..54eb7eeb2aa 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -305,6 +305,11 @@ public class TsBlock {
timeColumn.getLong(rowIndex),
valueColumns[columnIndex].getTsPrimitiveType(rowIndex));
}
+ @Override
+ public long getUsedMemorySize() {
+ return getRetainedSizeInBytes();
+ }
+
@Override
public void close() {
// do nothing
@@ -372,6 +377,10 @@ public class TsBlock {
this.rowIndex = rowIndex;
}
+ public long getRes() {
+ return getRetainedSizeInBytes();
+ }
+
@Override
public boolean hasNext() {
return rowIndex < positionCount;
@@ -440,6 +449,11 @@ public class TsBlock {
timeColumn.getLong(rowIndex), new
TsPrimitiveType.TsVector(currentValue()));
}
+ @Override
+ public long getUsedMemorySize() {
+ return getRetainedSizeInBytes();
+ }
+
@Override
public void close() {
// do nothing
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
index d29d70d063b..31dcd1235d7 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
@@ -31,5 +31,7 @@ public interface IPointReader {
TimeValuePair currentTimeValuePair() throws IOException;
+ long getUsedMemorySize();
+
void close() throws IOException;
}
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
index e967305e5a5..4636b52ecbf 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
@@ -32,8 +32,8 @@ import java.util.List;
*/
public class LazyLoadAlignedPagePointReader implements IPointReader {
- private TimePageReader timeReader;
- private List<ValuePageReader> valueReaders;
+ private final TimePageReader timeReader;
+ private final List<ValuePageReader> valueReaders;
private boolean hasNextRow = false;
@@ -93,5 +93,13 @@ public class LazyLoadAlignedPagePointReader implements
IPointReader {
}
@Override
- public void close() throws IOException {}
+ public long getUsedMemorySize() {
+ // not used
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
}