This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad6a46c  Emit freshness lag at server level as well (#4308)
ad6a46c is described below

commit ad6a46c3e472022ed23764b7f91aa19db97a1092
Author: Sunitha Beeram <[email protected]>
AuthorDate: Mon Jun 17 10:34:09 2019 -0700

    Emit freshness lag at server level as well (#4308)
    
    * Emit freshness lag at server level as well
    
    * Address review comments
---
 .../java/org/apache/pinot/common/metadata/RowMetadata.java    | 11 ++++++++---
 .../java/org/apache/pinot/common/metrics/ServerTimer.java     |  6 +++++-
 .../pinot/core/indexsegment/mutable/MutableSegmentImpl.java   |  3 +--
 .../org/apache/pinot/core/query/scheduler/QueryScheduler.java |  6 ++++++
 .../pinot/core/realtime/stream/StreamMessageMetadata.java     |  6 ++++++
 5 files changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
index 4e67a05..63be9f1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
@@ -23,16 +23,21 @@ import org.apache.pinot.annotations.InterfaceStability;
 
 
 /**
- * A class that provides relevant row-level metadata for rows ingested into a 
segment.
+ * A class that provides relevant row-level metadata for rows indexed into a 
segment.
  *
- * Currently this is relevant for rows ingested into a mutable segment.
+ * Currently this is relevant for rows ingested into a mutable segment - the 
metadata is expected to be
+ * provided by the underlying stream.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RowMetadata {
 
   /**
-   * Return the ingestion timestamp of the row.
+   * Return the timestamp associated with when the row was ingested upstream.
+   * Expected to be mainly used for stream-based sources.
+   *
+   * @return timestamp (epoch in milliseconds) when the row was ingested 
upstream
+   *         Long.MIN_VALUE if not available
    */
   long getIngestionTimeMs();
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index a8560f0..326ae10 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -26,7 +26,11 @@ import org.apache.pinot.common.Utils;
  *
  */
 public enum ServerTimer implements AbstractMetrics.Timer {
-  CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false);
+  // don't see usages for this
+  @Deprecated
+  CURRENT_MSG_EVENT_TIMESTAMP_LAG("currentMsgEventTimestampLag", false),
+  // metric tracking the freshness lag for consuming segments
+  FRESHNESS_LAG_MS("freshnessLagMs", false);
 
   private final String timerName;
   private final boolean global;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 3a09bbb..972d91c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -99,7 +99,6 @@ public class MutableSegmentImpl implements MutableSegment {
   private final int _numKeyColumns;
 
   // default message metadata
-  private static final StreamMessageMetadata _defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
   private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
   private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
 
@@ -243,7 +242,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
     _lastIndexedTimeMs = System.currentTimeMillis();
 
-    if (rowMetadata != null) {
+    if (rowMetadata != null && rowMetadata.getIngestionTimeMs() != 
Long.MIN_VALUE) {
       _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, 
rowMetadata.getIngestionTimeMs());
     }
     return canTakeMore;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 51e28ed..24c4ec0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFutureTask;
 import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
@@ -34,6 +35,7 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
+import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
@@ -220,6 +222,10 @@ public abstract class QueryScheduler {
       numDroppedLogCounter.incrementAndGet();
     }
 
+    if (minConsumingFreshnessMs > -1) {
+      serverMetrics.addTimedTableValue(tableNameWithType, 
ServerTimer.FRESHNESS_LAG_MS,
+          (System.currentTimeMillis() - minConsumingFreshnessMs), 
TimeUnit.MILLISECONDS);
+    }
     serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
     serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
     serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index 0b3ad84..9ad416a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -29,6 +29,12 @@ public class StreamMessageMetadata implements RowMetadata {
 
   private final long _ingestionTimeMs;
 
+  /**
+   * Construct the stream based message/row message metadata
+   *
+   * @param ingestionTimeMs  the time that the message was ingested by the 
stream provider
+   *                         use Long.MIN_VALUE if not applicable
+   */
   public StreamMessageMetadata(long ingestionTimeMs) {
     _ingestionTimeMs = ingestionTimeMs;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to