This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 802c596e9f add freshness lag when there are no consuming segments
(#8911)
802c596e9f is described below
commit 802c596e9f7b778c72c81e2e144a0182358c1585
Author: Prachi Khobragade <[email protected]>
AuthorDate: Thu Aug 11 09:15:51 2022 -0700
add freshness lag when there are no consuming segments (#8911)
* add freshness lag when there are no consuming segments
The current freshness lag metric doesn't emit a metric, when there are no
consuming segments. This won't emit lag when there are errors while
consuming or
a consuming segment is not created, so calculate the lag from the endtime
using
the last segment that was online as freshness
* check related brokermetrics to update
* correct the broker metric, check for _minConsumingFreshnessTimeMs
---
.../query/executor/ServerQueryExecutorV1Impl.java | 72 ++++++++++++++++------
.../pinot/core/query/reduce/BaseReduceService.java | 4 +-
2 files changed, 56 insertions(+), 20 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index fc54265715..b47e2eac48 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -50,6 +50,7 @@ import
org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.common.datatable.DataTableUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
import org.apache.pinot.core.plan.Plan;
@@ -69,6 +70,7 @@ import org.apache.pinot.core.util.trace.TraceContext;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -76,6 +78,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,19 +200,41 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
// Gather stats for realtime consuming segments
int numConsumingSegmentsQueried = 0;
- long minIndexTimeMs = Long.MAX_VALUE;
- long minIngestionTimeMs = Long.MAX_VALUE;
- for (IndexSegment indexSegment : indexSegments) {
- if (indexSegment instanceof MutableSegment) {
- numConsumingSegmentsQueried += 1;
- SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
- long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
- if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
- minIndexTimeMs = indexTimeMs;
- }
- long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
- if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs <
minIngestionTimeMs) {
- minIngestionTimeMs = ingestionTimeMs;
+ int numOnlineSegments = 0;
+ long minIndexTimeMs = 0;
+ long minIngestionTimeMs = 0;
+ long maxEndTimeMs = 0;
+ if (tableDataManager instanceof RealtimeTableDataManager) {
+ numConsumingSegmentsQueried = 0;
+ numOnlineSegments = 0;
+ minIndexTimeMs = Long.MAX_VALUE;
+ minIngestionTimeMs = Long.MAX_VALUE;
+ maxEndTimeMs = Long.MIN_VALUE;
+ for (IndexSegment indexSegment : indexSegments) {
+ if (indexSegment instanceof MutableSegment) {
+ numConsumingSegmentsQueried += 1;
+ SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+ long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
+ if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
+ minIndexTimeMs = indexTimeMs;
+ }
+ long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
+ if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs <
minIngestionTimeMs) {
+ minIngestionTimeMs = ingestionTimeMs;
+ }
+ } else if (indexSegment instanceof ImmutableSegment) {
+ SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+ long indexCreationTime = segmentMetadata.getIndexCreationTime();
+ numOnlineSegments++;
+ if (indexCreationTime != Long.MIN_VALUE) {
+ maxEndTimeMs = Math.max(maxEndTimeMs, indexCreationTime);
+ } else {
+ // NOTE: the endTime may be totally inaccurate based on the value
added in the timeColumn
+ Interval timeInterval = segmentMetadata.getTimeInterval();
+ if (timeInterval != null) {
+ maxEndTimeMs = Math.max(maxEndTimeMs,
timeInterval.getEndMillis());
+ }
+ }
}
}
}
@@ -263,12 +288,21 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
_serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_MISSING_SEGMENTS, numMissingSegments);
}
- if (numConsumingSegmentsQueried > 0) {
- long minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE
? minIngestionTimeMs : minIndexTimeMs;
- LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
- numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
- metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
Integer.toString(numConsumingSegmentsQueried));
- metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
Long.toString(minConsumingFreshnessTimeMs));
+ if (tableDataManager instanceof RealtimeTableDataManager) {
+ long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+ if (numConsumingSegmentsQueried > 0) {
+ minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE ?
minIngestionTimeMs : minIndexTimeMs;
+ metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
+ Integer.toString(numConsumingSegmentsQueried));
+ metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
Long.toString(minConsumingFreshnessTimeMs));
+ LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
+ numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
+ } else if (numConsumingSegmentsQueried == 0 && maxEndTimeMs !=
Long.MIN_VALUE) {
+ minConsumingFreshnessTimeMs = maxEndTimeMs;
+ metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
Long.toString(maxEndTimeMs));
+ LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
+ numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
+ }
}
LOGGER.debug("Query processing time for request Id - {}: {}", requestId,
queryProcessingTime);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
index 6417228b55..bbad1f985a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java
@@ -310,6 +310,8 @@ public abstract class BaseReduceService {
brokerResponseNative.setExplainPlanNumMatchAllFilterSegments(_explainPlanNumMatchAllFilterSegments);
if (_numConsumingSegmentsQueried > 0) {
brokerResponseNative.setNumConsumingSegmentsQueried(_numConsumingSegmentsQueried);
+ }
+ if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
brokerResponseNative.setMinConsumingFreshnessTimeMs(_minConsumingFreshnessTimeMs);
}
brokerResponseNative.setNumConsumingSegmentsProcessed(_numConsumingSegmentsProcessed);
@@ -340,7 +342,7 @@ public abstract class BaseReduceService {
brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, _realtimeTotalCpuTimeNs,
TimeUnit.NANOSECONDS);
- if (_numConsumingSegmentsQueried > 0 && _minConsumingFreshnessTimeMs >
0) {
+ if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
brokerMetrics.addTimedTableValue(rawTableName,
BrokerTimer.FRESHNESS_LAG_MS,
System.currentTimeMillis() - _minConsumingFreshnessTimeMs,
TimeUnit.MILLISECONDS);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]