This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 802c596e9f add freshness lag when there are no consuming segments 
(#8911)
802c596e9f is described below

commit 802c596e9f7b778c72c81e2e144a0182358c1585
Author: Prachi Khobragade <[email protected]>
AuthorDate: Thu Aug 11 09:15:51 2022 -0700

    add freshness lag when there are no consuming segments (#8911)
    
    * add freshness lag when there are no consuming segments
    
    The current freshness lag metric doesn't emit a metric, when there are no
    consuming segments. This won't emit lag when there are errors while 
consuming or
    a consuming segment is not created, so calculate the lag from the endtime 
using
    the last segment that was online as freshness
    
    * check related brokermetrics to update
    
    * correct the broker metric, check for _minConsumingFreshnessTimeMs
---
 .../query/executor/ServerQueryExecutorV1Impl.java  | 72 ++++++++++++++++------
 .../pinot/core/query/reduce/BaseReduceService.java |  4 +-
 2 files changed, 56 insertions(+), 20 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index fc54265715..b47e2eac48 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -50,6 +50,7 @@ import 
org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.common.datatable.DataTableUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
 import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
 import org.apache.pinot.core.plan.Plan;
@@ -69,6 +70,7 @@ import org.apache.pinot.core.util.trace.TraceContext;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -76,6 +78,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -197,19 +200,41 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
     // Gather stats for realtime consuming segments
     int numConsumingSegmentsQueried = 0;
-    long minIndexTimeMs = Long.MAX_VALUE;
-    long minIngestionTimeMs = Long.MAX_VALUE;
-    for (IndexSegment indexSegment : indexSegments) {
-      if (indexSegment instanceof MutableSegment) {
-        numConsumingSegmentsQueried += 1;
-        SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
-        long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
-        if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
-          minIndexTimeMs = indexTimeMs;
-        }
-        long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
-        if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs < 
minIngestionTimeMs) {
-          minIngestionTimeMs = ingestionTimeMs;
+    int numOnlineSegments = 0;
+    long minIndexTimeMs = 0;
+    long minIngestionTimeMs = 0;
+    long maxEndTimeMs = 0;
+    if (tableDataManager instanceof RealtimeTableDataManager) {
+      numConsumingSegmentsQueried = 0;
+      numOnlineSegments = 0;
+      minIndexTimeMs = Long.MAX_VALUE;
+      minIngestionTimeMs = Long.MAX_VALUE;
+      maxEndTimeMs = Long.MIN_VALUE;
+      for (IndexSegment indexSegment : indexSegments) {
+        if (indexSegment instanceof MutableSegment) {
+          numConsumingSegmentsQueried += 1;
+          SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+          long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
+          if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
+            minIndexTimeMs = indexTimeMs;
+          }
+          long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
+          if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs < 
minIngestionTimeMs) {
+            minIngestionTimeMs = ingestionTimeMs;
+          }
+        } else if (indexSegment instanceof ImmutableSegment) {
+          SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+          long indexCreationTime = segmentMetadata.getIndexCreationTime();
+          numOnlineSegments++;
+          if (indexCreationTime != Long.MIN_VALUE) {
+            maxEndTimeMs = Math.max(maxEndTimeMs, indexCreationTime);
+          } else {
+            // NOTE: the endTime may be totally inaccurate based on the value 
added in the timeColumn
+            Interval timeInterval = segmentMetadata.getTimeInterval();
+            if (timeInterval != null) {
+              maxEndTimeMs = Math.max(maxEndTimeMs, 
timeInterval.getEndMillis());
+            }
+          }
         }
       }
     }
@@ -263,12 +288,21 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_MISSING_SEGMENTS, numMissingSegments);
     }
 
-    if (numConsumingSegmentsQueried > 0) {
-      long minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE 
? minIngestionTimeMs : minIndexTimeMs;
-      LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
-          numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
-      metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), 
Integer.toString(numConsumingSegmentsQueried));
-      metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
Long.toString(minConsumingFreshnessTimeMs));
+    if (tableDataManager instanceof RealtimeTableDataManager) {
+      long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+      if (numConsumingSegmentsQueried > 0) {
+        minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE ? 
minIngestionTimeMs : minIndexTimeMs;
+        metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
+            Integer.toString(numConsumingSegmentsQueried));
+        metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
Long.toString(minConsumingFreshnessTimeMs));
+        LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
+            numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
+      } else if (numConsumingSegmentsQueried == 0 && maxEndTimeMs != 
Long.MIN_VALUE) {
+        minConsumingFreshnessTimeMs = maxEndTimeMs;
+        metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
Long.toString(maxEndTimeMs));
+        LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
+            numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
+      }
     }
 
     LOGGER.debug("Query processing time for request Id - {}: {}", requestId, 
queryProcessingTime);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 6417228b55..bbad1f985a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -310,6 +310,8 @@ public abstract class BaseReduceService {
       
brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
       if (_numConsumingSegmentsQueried > 0) {
         
brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
+      }
+      if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
         
brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
       }
       
brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
@@ -340,7 +342,7 @@ public abstract class BaseReduceService {
         brokerMetrics.addTimedTableValue(rawTableName, 
BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
             TimeUnit.NANOSECONDS);
 
-        if (_numConsumingSegmentsQueried > 0 && _minConsumingFreshnessTimeMs > 
0) {
+        if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
           brokerMetrics.addTimedTableValue(rawTableName, 
BrokerTimer.FRESHNESS_LAG_MS,
               System.currentTimeMillis() - _minConsumingFreshnessTimeMs, 
TimeUnit.MILLISECONDS);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to