This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/add_more_metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/add_more_metrics by 
this push:
     new 96e96a4001c add more query metrics
96e96a4001c is described below

commit 96e96a4001c059cca10698cc39a8eec25ae6b4fb
Author: Beyyes <[email protected]>
AuthorDate: Thu Mar 14 18:08:40 2024 +0800

    add more query metrics
---
 .../fragment/FragmentInstanceContext.java          |  3 +++
 .../execution/fragment/QueryStatistics.java        |  2 ++
 .../execution/operator/source/SeriesScanUtil.java  | 12 ++++++++++
 .../metric/SeriesScanCostMetricSet.java            | 26 +++++++++++++++++++++
 .../read/reader/chunk/MemChunkReader.java          |  6 +++++
 .../dataregion/read/reader/common/Element.java     | 10 +++++---
 .../read/reader/common/PriorityMergeReader.java    | 27 ++++++++++++++++++----
 .../read/reader/common/FakedSeriesReader.java      |  6 +++++
 .../apache/iotdb/tsfile/read/common/BatchData.java |  6 +++++
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 14 +++++++++++
 .../iotdb/tsfile/read/reader/IPointReader.java     |  2 ++
 .../page/LazyLoadAlignedPagePointReader.java       | 14 ++++++++---
 12 files changed, 117 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 8c74170cef3..ab0f7d7417f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -543,6 +543,9 @@ public class FragmentInstanceContext extends QueryContext {
             getQueryStatistics().pageReadersDecodeAlignedDiskTime.get(),
             getQueryStatistics().pageReadersDecodeNonAlignedMemTime.get(),
             getQueryStatistics().pageReadersDecodeNonAlignedDiskTime.get());
+
+    SeriesScanCostMetricSet.getInstance()
+        
.updatePageReaderMemoryUsage(getQueryStatistics().pageReaderMaxUsedMemorySize.get());
   }
 
   private void releaseDataNodeQueryContext() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
index 60cd84b111e..82094467034 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryStatistics.java
@@ -69,6 +69,8 @@ public class QueryStatistics {
   public AtomicLong pageReadersDecodeNonAlignedMemCount = new AtomicLong(0);
   public AtomicLong pageReadersDecodeNonAlignedMemTime = new AtomicLong(0);
 
+  public AtomicLong pageReaderMaxUsedMemorySize = new AtomicLong(0);
+
   public TQueryStatistics toThrift() {
     return new TQueryStatistics(
         loadTimeSeriesMetadataDiskSeqCount.get(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index cd1dc52e575..0917ccfb86b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -791,6 +791,10 @@ public class SeriesScanUtil {
                         
firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
                     firstPageReader.version,
                     
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
+                context
+                    .getQueryStatistics()
+                    .pageReaderMaxUsedMemorySize
+                    .updateAndGet(v -> Math.max(v, 
mergeReader.getUsedMemorySize()));
                 currentPageEndPointTime =
                     updateEndPointTime(currentPageEndPointTime, 
firstPageReader);
                 firstPageReader = null;
@@ -815,6 +819,10 @@ public class SeriesScanUtil {
                     
getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
                     pageReader.version,
                     
orderUtils.getOverlapCheckTime(pageReader.getStatistics()));
+                context
+                    .getQueryStatistics()
+                    .pageReaderMaxUsedMemorySize
+                    .updateAndGet(v -> Math.max(v, 
mergeReader.getUsedMemorySize()));
                 currentPageEndPointTime = 
updateEndPointTime(currentPageEndPointTime, pageReader);
               }
             }
@@ -987,6 +995,10 @@ public class SeriesScanUtil {
         
getPointReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())),
         pageReader.version,
         orderUtils.getOverlapCheckTime(pageReader.getStatistics()));
+    context
+        .getQueryStatistics()
+        .pageReaderMaxUsedMemorySize
+        .updateAndGet(v -> Math.max(v, mergeReader.getUsedMemorySize()));
   }
 
   private TsBlock nextOverlappedPage() throws IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
index 44df56d7961..32c161af845 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/SeriesScanCostMetricSet.java
@@ -970,6 +970,7 @@ public class SeriesScanCostMetricSet implements IMetricSet {
   private static final String BUILD_TSBLOCK_FROM_PAGE_READER = 
"build_tsblock_from_page_reader";
   private static final String HISTOGRAM_BUILD_TSBLOCK_FROM_PAGE_READER =
       "histogram_build_tsblock_from_page_reader";
+  private static final String PAGE_READER_MAX_USED_MEMORY_SIZE = 
"page_reader_max_used_memory_size";
 
   private Histogram pageReadersDecodeAlignedMemHistogram =
       DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
@@ -980,6 +981,8 @@ public class SeriesScanCostMetricSet implements IMetricSet {
   private Histogram pageReadersDecodeNonAlignedDiskHistogram =
       DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
 
+  private Histogram pageReaderMaxMemoryHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
   private Timer pageReadersDecodeAlignedMemTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer pageReadersDecodeAlignedDiskTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer pageReadersDecodeNonAlignedMemTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
@@ -1004,6 +1007,10 @@ public class SeriesScanCostMetricSet implements 
IMetricSet {
     pageReadersDecodeNonAlignedDiskTimer.updateNanos(nonAlignedDiskTime);
   }
 
+  public void updatePageReaderMemoryUsage(long memorySize) {
+    pageReaderMaxMemoryHistogram.update(memorySize);
+  }
+
   private void bindTsBlockFromPageReader(AbstractMetricService metricService) {
     pageReadersDecodeAlignedMemHistogram =
         metricService.getOrCreateHistogram(
@@ -1086,6 +1093,15 @@ public class SeriesScanCostMetricSet implements 
IMetricSet {
             NON_ALIGNED,
             Tag.FROM.toString(),
             DISK);
+
+    pageReaderMaxMemoryHistogram =
+        metricService.getOrCreateHistogram(
+            Metric.SERIES_SCAN_COST.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.STAGE.toString(),
+            HISTOGRAM_BUILD_TSBLOCK_FROM_PAGE_READER,
+            Tag.TYPE.toString(),
+            PAGE_READER_MAX_USED_MEMORY_SIZE);
   }
 
   private void unbindTsBlockFromPageReader(AbstractMetricService 
metricService) {
@@ -1093,6 +1109,8 @@ public class SeriesScanCostMetricSet implements 
IMetricSet {
     pageReadersDecodeAlignedDiskHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
     pageReadersDecodeNonAlignedMemHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
     pageReadersDecodeNonAlignedDiskHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+    pageReaderMaxMemoryHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
     pageReadersDecodeAlignedMemTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
     pageReadersDecodeAlignedDiskTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
     pageReadersDecodeNonAlignedMemTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
@@ -1121,6 +1139,14 @@ public class SeriesScanCostMetricSet implements 
IMetricSet {
             from);
       }
     }
+
+    metricService.remove(
+        MetricType.HISTOGRAM,
+        Metric.SERIES_SCAN_COST.toString(),
+        Tag.STAGE.toString(),
+        HISTOGRAM_BUILD_TSBLOCK_FROM_PAGE_READER,
+        Tag.TYPE.toString(),
+        PAGE_READER_MAX_USED_MEMORY_SIZE);
   }
 
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
index 90719ade794..4a9844ad2cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java
@@ -98,6 +98,12 @@ public class MemChunkReader implements IChunkReader, 
IPointReader {
     return pageReaderList.remove(0).getAllSatisfiedPageData();
   }
 
+  @Override
+  public long getUsedMemorySize() {
+    // not used
+    return timeValuePairIterator.getUsedMemorySize();
+  }
+
   @Override
   public void close() {
     // Do nothing because mem chunk reader will not open files
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
index 2f73c00a119..746701b8753 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/Element.java
@@ -26,9 +26,9 @@ import java.io.IOException;
 
 public class Element {
 
-  public PriorityMergeReader.MergeReaderPriority priority;
-  protected IPointReader reader;
-  public TimeValuePair timeValuePair;
+  private final PriorityMergeReader.MergeReaderPriority priority;
+  private final IPointReader reader;
+  private TimeValuePair timeValuePair;
 
   public Element(
       IPointReader reader,
@@ -67,6 +67,10 @@ public class Element {
     return timeValuePair;
   }
 
+  public void setTimeValuePair(TimeValuePair timeValuePair) {
+    this.timeValuePair = timeValuePair;
+  }
+
   public PriorityMergeReader.MergeReaderPriority getPriority() {
     return priority;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index de0281e7b6d..330c856ded0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
@@ -36,16 +37,20 @@ public class PriorityMergeReader implements IPointReader {
 
   protected PriorityQueue<Element> heap;
 
+  protected long usedMemorySize = 0;
+
   public PriorityMergeReader() {
     heap =
         new PriorityQueue<>(
             (o1, o2) -> {
               int timeCompare =
-                  Long.compare(o1.timeValuePair.getTimestamp(), 
o2.timeValuePair.getTimestamp());
-              return timeCompare != 0 ? timeCompare : 
o2.priority.compareTo(o1.priority);
+                  Long.compare(
+                      o1.getTimeValuePair().getTimestamp(), 
o2.getTimeValuePair().getTimestamp());
+              return timeCompare != 0 ? timeCompare : 
o2.getPriority().compareTo(o1.getPriority());
             });
   }
 
+  @TestOnly
   public void addReader(IPointReader reader, long priority) throws IOException 
{
     if (reader.hasNextTimeValuePair()) {
       heap.add(
@@ -58,8 +63,10 @@ public class PriorityMergeReader implements IPointReader {
   public void addReader(IPointReader reader, MergeReaderPriority priority, 
long endTime)
       throws IOException {
     if (reader.hasNextTimeValuePair()) {
-      heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+      Element element = new Element(reader, reader.nextTimeValuePair(), 
priority);
+      heap.add(element);
       currentReadStopTime = Math.max(currentReadStopTime, endTime);
+      usedMemorySize += element.getReader().getUsedMemorySize();
     } else {
       reader.close();
     }
@@ -85,8 +92,10 @@ public class PriorityMergeReader implements IPointReader {
     }
     updateHeap(ret, topNext);
     if (topNext != null) {
-      top.timeValuePair = topNext;
+      top.setTimeValuePair(topNext);
       heap.add(top);
+    } else {
+      usedMemorySize -= top.getReader().getUsedMemorySize();
     }
     return ret;
   }
@@ -111,7 +120,8 @@ public class PriorityMergeReader implements IPointReader {
       Element e = heap.poll();
       fillNullValue(ret, e.getTimeValuePair());
       if (!e.hasNext()) {
-        e.reader.close();
+        usedMemorySize -= e.getReader().getUsedMemorySize();
+        e.getReader().close();
         continue;
       }
       e.next();
@@ -122,6 +132,7 @@ public class PriorityMergeReader implements IPointReader {
           e.next();
           heap.add(e);
         } else {
+          usedMemorySize -= e.getReader().getUsedMemorySize();
           // the chunk is end
           e.close();
         }
@@ -136,12 +147,18 @@ public class PriorityMergeReader implements IPointReader {
     // do nothing for non-aligned time series
   }
 
+  @Override
+  public long getUsedMemorySize() {
+    return usedMemorySize;
+  }
+
   @Override
   public void close() throws IOException {
     while (!heap.isEmpty()) {
       Element e = heap.poll();
       e.close();
     }
+    usedMemorySize = 0;
   }
 
   public static class MergeReaderPriority implements 
Comparable<MergeReaderPriority> {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
index 86cae386bb5..ac5d2ceaf4e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java
@@ -80,6 +80,12 @@ public class FakedSeriesReader implements IPointReader {
     throw new IOException("current() in FakedPrioritySeriesReader is an empty 
method.");
   }
 
+  @Override
+  public long getUsedMemorySize() {
+    // not used
+    return 0;
+  }
+
   @Override
   public void close() {}
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 4d57583d80a..e79da513ac8 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -855,6 +855,12 @@ public class BatchData {
       return new TimeValuePair(currentTime(), currentTsPrimitiveType());
     }
 
+    @Override
+    public long getUsedMemorySize() {
+      // not used
+      return 0;
+    }
+
     @Override
     public void close() {
       // do nothing
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 668b396c7bd..54eb7eeb2aa 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -305,6 +305,11 @@ public class TsBlock {
           timeColumn.getLong(rowIndex), 
valueColumns[columnIndex].getTsPrimitiveType(rowIndex));
     }
 
+    @Override
+    public long getUsedMemorySize() {
+      return getRetainedSizeInBytes();
+    }
+
     @Override
     public void close() {
       // do nothing
@@ -372,6 +377,10 @@ public class TsBlock {
       this.rowIndex = rowIndex;
     }
 
+    public long getRes() {
+      return getRetainedSizeInBytes();
+    }
+
     @Override
     public boolean hasNext() {
       return rowIndex < positionCount;
@@ -440,6 +449,11 @@ public class TsBlock {
           timeColumn.getLong(rowIndex), new 
TsPrimitiveType.TsVector(currentValue()));
     }
 
+    @Override
+    public long getUsedMemorySize() {
+      return getRetainedSizeInBytes();
+    }
+
     @Override
     public void close() {
       // do nothing
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
index d29d70d063b..31dcd1235d7 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/IPointReader.java
@@ -31,5 +31,7 @@ public interface IPointReader {
 
   TimeValuePair currentTimeValuePair() throws IOException;
 
+  long getUsedMemorySize();
+
   void close() throws IOException;
 }
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
index e967305e5a5..4636b52ecbf 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/LazyLoadAlignedPagePointReader.java
@@ -32,8 +32,8 @@ import java.util.List;
  */
 public class LazyLoadAlignedPagePointReader implements IPointReader {
 
-  private TimePageReader timeReader;
-  private List<ValuePageReader> valueReaders;
+  private final TimePageReader timeReader;
+  private final List<ValuePageReader> valueReaders;
 
   private boolean hasNextRow = false;
 
@@ -93,5 +93,13 @@ public class LazyLoadAlignedPagePointReader implements 
IPointReader {
   }
 
   @Override
-  public void close() throws IOException {}
+  public long getUsedMemorySize() {
+    // not used
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing
+  }
 }

Reply via email to