This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 18d628b  Track "freshness" timestamp across consuming segments (#3979)
18d628b is described below

commit 18d628b8fda8cf27c58854b64b8d4ee177c5334b
Author: Sunitha Beeram <[email protected]>
AuthorDate: Wed May 8 21:47:58 2019 -0700

    Track "freshness" timestamp across consuming segments (#3979)
    
    * Track Indexed timestamp across consuming segments
    
    * Fix when indexing time is updated
    
    * Update based on PR#4025
    
    * Fix tests
    
    * Simplify interfaces
    
    * Update interface annotation
    
    * Address review comments
    
    * Fix names in BrokerResponseNative; Rely on instanceof
    
    * Address review comments
    
    * Fix name
    
    * Fix test
---
 .../apache/pinot/common/metadata/RowMetadata.java  | 30 ++++++------------
 .../response/broker/BrokerResponseNative.java      | 27 +++++++++++++++-
 .../org/apache/pinot/common/utils/DataTable.java   |  2 ++
 .../realtime/HLRealtimeSegmentDataManager.java     |  7 ++--
 .../realtime/LLRealtimeSegmentDataManager.java     |  7 ++--
 .../core/indexsegment/mutable/MutableSegment.java  |  7 ++--
 .../indexsegment/mutable/MutableSegmentImpl.java   | 23 +++++++-------
 .../query/executor/ServerQueryExecutorV1Impl.java  | 37 +++++++++++++++++++++-
 .../core/query/reduce/BrokerReduceService.java     | 19 +++++++++++
 .../impl/kafka/KafkaStreamLevelConsumer.java       |  7 ++--
 .../pinot/core/realtime/stream/MessageBatch.java   |  7 ++--
 .../core/realtime/stream/StreamLevelConsumer.java  | 11 +------
 .../core/realtime/stream/StreamMessageDecoder.java | 36 ---------------------
 .../realtime/stream/StreamMessageMetadata.java     | 25 +++++----------
 .../MutableSegmentImplAggregateMetricsTest.java    |  2 +-
 .../mutable/MutableSegmentImplTest.java            | 15 ++++++++-
 .../tests/LLCRealtimeClusterIntegrationTest.java   |  4 +++
 .../realtime/provisioning/MemoryEstimator.java     |  5 +--
 18 files changed, 148 insertions(+), 123 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
 b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
similarity index 64%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
copy to 
pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
index 30c0859..4e67a05 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
@@ -16,35 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.common.metadata;
 
 import org.apache.pinot.annotations.InterfaceAudience;
 import org.apache.pinot.annotations.InterfaceStability;
 
 
 /**
- * A class that provides metadata associated with the message of a stream, for 
e.g.,
- * ingestion-timestamp of the message.
+ * A class that provides relevant row-level metadata for rows ingested into a 
segment.
+ *
+ * Currently this is relevant for rows ingested into a mutable segment.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class StreamMessageMetadata {
-
-  private long _ingestionTimestamp = Long.MIN_VALUE;
-
-  public StreamMessageMetadata() {
-
-  }
-
-  public long getIngestionTimestamp() {
-    return _ingestionTimestamp;
-  }
-
-  public void setIngestionTimestamp(long timestamp) {
-    _ingestionTimestamp = timestamp;
-  }
+public interface RowMetadata {
 
-  public void reset() {
-    _ingestionTimestamp = Long.MIN_VALUE;
-  }
+  /**
+   * Return the ingestion timestamp of the row.
+   */
+  long getIngestionTimeMs();
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 94c6d46..214d942 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -39,7 +39,7 @@ import org.apache.pinot.common.utils.JsonUtils;
  *
  * Supports serialization via JSON.
  */
-@JsonPropertyOrder({"selectionResults", "aggregationResults", "exceptions", 
"numServersQueried", "numServersResponded", "numSegmentsQueried", 
"numSegmentsProcessed", "numSegmentsMatched", "numDocsScanned", 
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", 
"numGroupsLimitReached", "totalDocs", "timeUsedMs", "segmentStatistics", 
"traceInfo"})
+@JsonPropertyOrder({"selectionResults", "aggregationResults", "exceptions", 
"numServersQueried", "numServersResponded", "numSegmentsQueried", 
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", 
"numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", 
"numGroupsLimitReached", "totalDocs", "timeUsedMs", "segmentStatistics", 
"traceInfo"})
 public class BrokerResponseNative implements BrokerResponse {
   public static final BrokerResponseNative EMPTY_RESULT = 
BrokerResponseNative.empty();
   public static final BrokerResponseNative NO_TABLE_RESULT =
@@ -53,6 +53,10 @@ public class BrokerResponseNative implements BrokerResponse {
   private long _numSegmentsQueried = 0L;
   private long _numSegmentsProcessed = 0L;
   private long _numSegmentsMatched = 0L;
+  private long _numConsumingSegmentsQueried = 0L;
+  // the timestamp indicating the freshness of the data queried in consuming 
segments.
+  // This can be ingestion timestamp if provided by the stream, or the last 
index time
+  private long _minConsumingFreshnessTimeMs = 0L;
 
   private long _totalDocs = 0L;
   private boolean _numGroupsLimitReached = false;
@@ -203,6 +207,27 @@ public class BrokerResponseNative implements 
BrokerResponse {
     _numSegmentsMatched = numSegmentsMatched;
   }
 
+  @JsonProperty("numConsumingSegmentsQueried")
+  public long getNumConsumingSegmentsQueried() {
+    return _numConsumingSegmentsQueried;
+  }
+
+  @JsonProperty("numConsumingSegmentsQueried")
+  public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried) 
{
+    _numConsumingSegmentsQueried = numConsumingSegmentsQueried;
+  }
+
+  @JsonProperty("minConsumingFreshnessTimeMs")
+  public long getMinConsumingFreshnessTimeMs() {
+    return _minConsumingFreshnessTimeMs;
+  }
+
+  @JsonProperty("minConsumingFreshnessTimeMs")
+  public void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs) 
{
+    _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
+  }
+
+
   @JsonProperty("totalDocs")
   @Override
   public long getTotalDocs() {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 1a6c49d..5383a3c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -35,6 +35,8 @@ public interface DataTable {
   String NUM_SEGMENTS_QUERIED = "numSegmentsQueried";
   String NUM_SEGMENTS_PROCESSED = "numSegmentsProcessed";
   String NUM_SEGMENTS_MATCHED = "numSegmentsMatched";
+  String NUM_CONSUMING_SEGMENTS_QUERIED = "numConsumingSegmentsQueried";
+  String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs";
   String TOTAL_DOCS_METADATA_KEY = "totalDocs";
   String NUM_GROUPS_LIMIT_REACHED_KEY = "numGroupsLimitReached";
   String TIME_USED_MS_METADATA_KEY = "timeUsedMs";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index a8e0bdd..45e0128 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -212,17 +212,16 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
         int numRowsErrored = 0;
         GenericRow consumedRow = null;
-        StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
         do {
           try {
             consumedRow = GenericRow.createOrReuseRow(consumedRow);
-            messageMetadata.reset();
-            consumedRow = _streamLevelConsumer.next(consumedRow, 
messageMetadata);
+            consumedRow = _streamLevelConsumer.next(consumedRow);
 
             if (consumedRow != null) {
               GenericRow transformedRow = 
_recordTransformer.transform(consumedRow);
               if (transformedRow != null) {
-                notFull = realtimeSegment.index(transformedRow, 
messageMetadata);
+                // we currently do not get ingestion data through 
stream-consumer
+                notFull = realtimeSegment.index(transformedRow, null);
                 exceptionSleepMillis = 50L;
               }
             }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 9f23f89..7fcbd9c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
+import org.apache.pinot.common.metadata.RowMetadata;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -409,7 +410,6 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     int streamMessageCount = 0;
     boolean canTakeMore = true;
     GenericRow decodedRow = null;
-    StreamMessageMetadata msgMetadata = new StreamMessageMetadata();
 
     for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++) 
{
       if (_shouldStop || endCriteriaReached()) {
@@ -438,14 +438,13 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
       // Index each message
       decodedRow = GenericRow.createOrReuseRow(decodedRow);
-      msgMetadata.reset();
       // retrieve metadata from the message batch if available
       // this can be overridden by the decoder if there is a better indicator 
in the message payload
-      messagesAndOffsets.getMetadataAtIndex(index, msgMetadata);
+      RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
 
       decodedRow = _messageDecoder
           .decode(messagesAndOffsets.getMessageAtIndex(index), 
messagesAndOffsets.getMessageOffsetAtIndex(index),
-              messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow, 
msgMetadata);
+              messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow);
 
       if (decodedRow != null) {
         try {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
index 9ab8a68..316f265 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pinot.core.indexsegment.mutable;
 
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.RowMetadata;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 
 
 public interface MutableSegment extends IndexSegment {
@@ -29,10 +30,10 @@ public interface MutableSegment extends IndexSegment {
    * Indexes a record into the segment with optionally provided metadata.
    *
    * @param row Record represented as a {@link GenericRow}
-   * @param msgMetadata the metadata associated with the message
+   * @param rowMetadata the metadata associated with the message
    * @return Whether the segment is full (i.e. cannot index more record into 
it)
    */
-   boolean index(GenericRow row, StreamMessageMetadata msgMetadata);
+   boolean index(GenericRow row, @Nullable RowMetadata rowMetadata);
 
   /**
    * Returns the number of records already indexed into the segment.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index d7abb9e..a92ecfc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -26,9 +26,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.RowMetadata;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.core.data.GenericRow;
@@ -96,9 +98,9 @@ public class MutableSegmentImpl implements MutableSegment {
   private final int _numKeyColumns;
 
   // default message metadata
-  private static final StreamMessageMetadata _defaultMetadata = new 
StreamMessageMetadata();
-  private long _lastIndexedTimestamp = Long.MIN_VALUE;
-  private long _latestIngestionTimestamp = Long.MIN_VALUE;
+  private static final StreamMessageMetadata _defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
+  private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
+  private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
 
   public MutableSegmentImpl(RealtimeSegmentConfig config) {
     _segmentName = config.getSegmentName();
@@ -118,12 +120,12 @@ public class MutableSegmentImpl implements MutableSegment 
{
 
       @Override
       public long getLastIndexedTimestamp() {
-        return _lastIndexedTimestamp;
+        return _lastIndexedTimeMs;
       }
 
       @Override
       public long getLatestIngestionTimestamp() {
-        return _latestIngestionTimestamp;
+        return _latestIngestionTimeMs;
       }
     };
 
@@ -211,9 +213,9 @@ public class MutableSegmentImpl implements MutableSegment {
   }
 
   @Override
-  public boolean index(GenericRow row, StreamMessageMetadata msgMetadata) {
+  public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
 
-    boolean canTakeMore = false;
+    boolean canTakeMore;
     // Update dictionary first
     Map<String, Object> dictIdMap = updateDictionary(row);
 
@@ -238,12 +240,11 @@ public class MutableSegmentImpl implements MutableSegment 
{
       canTakeMore = aggregateMetrics(row, docId);
     }
 
-    _lastIndexedTimestamp = System.currentTimeMillis();
+    _lastIndexedTimeMs = System.currentTimeMillis();
 
-    if (msgMetadata != null) {
-      _latestIngestionTimestamp = Math.max(_latestIngestionTimestamp, 
msgMetadata.getIngestionTimestamp());
+    if (rowMetadata != null) {
+      _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs, 
rowMetadata.getIngestionTimeMs());
     }
-
     return canTakeMore;
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 4ab000c..3058539 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
@@ -41,6 +42,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.plan.maker.PlanMaker;
@@ -149,6 +151,34 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       TraceContext.register(requestId);
     }
 
+    int numConsumingSegmentsQueried = 0;
+    long minIndexTimeMs = Long.MAX_VALUE;
+    long minIngestionTimeMs = Long.MAX_VALUE;
+    // gather stats for realtime consuming segments
+    for (SegmentDataManager segmentMgr : segmentDataManagers) {
+      if (segmentMgr.getSegment() instanceof MutableSegment) {
+        numConsumingSegmentsQueried += 1;
+        SegmentMetadata metadata = 
segmentMgr.getSegment().getSegmentMetadata();
+        long indexedTime = metadata.getLastIndexedTimestamp();
+        if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) {
+          minIndexTimeMs = metadata.getLastIndexedTimestamp();
+        }
+        long ingestionTime = metadata.getLatestIngestionTimestamp();
+        if (ingestionTime != Long.MIN_VALUE && ingestionTime < 
minIngestionTimeMs) {
+          minIngestionTimeMs = ingestionTime;
+        }
+      }
+    }
+
+    long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+    if (numConsumingSegmentsQueried > 0) {
+      if (minIngestionTimeMs == Long.MAX_VALUE) {
+        LOGGER.debug("Did not find valid ingestionTimestamp across consuming 
segments! Using indexTime instead");
+        minConsumingFreshnessTimeMs = minIndexTimeMs;
+      }
+      LOGGER.debug("Querying {} consuming segments with min 
minConsumingFreshnessTimeMs {}", numConsumingSegmentsQueried, 
minConsumingFreshnessTimeMs);
+    }
+
     DataTable dataTable = null;
     try {
       TimerContext.Timer segmentPruneTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
@@ -219,13 +249,18 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       // Currently, given the deleted segments cache is in-memory only, a 
server restart will reset it
       // We might end up sending partial-response metadata in such cases. It 
appears that the likelihood of
       // this occurence is low; ie, segment has to be retained out and the 
server must be restarted while the
-      // broker view is still behind. We would however like to validate that 
and/or conf control this based on 
+      // broker view is still behind. We would however like to validate that 
and/or conf control this based on
       // data.
       
/*dataTable.addException(QueryException.getException(QueryException.SEGMENTS_MISSING_ERROR,
           "Could not find " + missingSegments + " segments on the server"));*/
       _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_MISSING_SEGMENTS, missingSegments);
     }
 
+    if (numConsumingSegmentsQueried > 0) {
+      dataTable.getMetadata().put(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED, 
Integer.toString(numConsumingSegmentsQueried));
+      dataTable.getMetadata().put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, 
Long.toString(minConsumingFreshnessTimeMs));
+    }
+
     LOGGER.debug("Query processing time for request Id - {}: {}", requestId, 
queryProcessingTime);
     LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, 
dataTable);
     return dataTable;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index a31ad30..d539bf7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -82,6 +82,8 @@ public class BrokerReduceService implements 
ReduceService<BrokerResponseNative>
     long numSegmentsQueried = 0L;
     long numSegmentsProcessed = 0L;
     long numSegmentsMatched = 0L;
+    long numConsumingSegmentsQueried = 0L;
+    long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
     long numTotalRawDocs = 0L;
     boolean numGroupsLimitReached = false;
 
@@ -136,6 +138,16 @@ public class BrokerReduceService implements 
ReduceService<BrokerResponseNative>
         numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
       }
 
+      String numConsumingString = 
metadata.get(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED);
+      if (numConsumingString != null) {
+        numConsumingSegmentsQueried += Long.parseLong(numConsumingString);
+      }
+
+      String minConsumingIndexTsString = 
metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+      if (minConsumingIndexTsString != null) {
+        minConsumingFreshnessTimeMs = 
Math.min(Long.parseLong(minConsumingIndexTsString), 
minConsumingFreshnessTimeMs);
+      }
+
       String numTotalRawDocsString = 
metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
       if (numTotalRawDocsString != null) {
         numTotalRawDocs += Long.parseLong(numTotalRawDocsString);
@@ -168,6 +180,13 @@ public class BrokerReduceService implements 
ReduceService<BrokerResponseNative>
     brokerResponseNative.setNumSegmentsMatched(numSegmentsMatched);
     brokerResponseNative.setTotalDocs(numTotalRawDocs);
     brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
+    if (numConsumingSegmentsQueried > 0) {
+      if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) {
+        LOGGER.error("Invalid lastIndexedTimestamp across {} consuming 
segments", numConsumingSegmentsQueried);
+      }
+      
brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried);
+      
brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
+    }
 
     // Update broker metrics.
     String tableName = brokerRequest.getQuerySource().getTableName();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
index d14fe62..29a0cf8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
@@ -22,6 +22,7 @@ import com.yammer.metrics.core.Meter;
 import kafka.consumer.ConsumerIterator;
 import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.RowMetadata;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -84,14 +85,10 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
 
   @Override
   public GenericRow next(GenericRow destination) {
-    return next(destination, null);
-  }
 
-  @Override
-  public GenericRow next(GenericRow destination, StreamMessageMetadata 
metadata) {
     if (kafkaIterator.hasNext()) {
       try {
-        destination = _messageDecoder.decode(kafkaIterator.next().message(), 
destination, metadata);
+        destination = _messageDecoder.decode(kafkaIterator.next().message(), 
destination);
         tableAndStreamRowsConsumed = _serverMetrics
             .addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
                 tableAndStreamRowsConsumed);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
index a575cff..09849ca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.realtime.stream;
 
 import org.apache.pinot.annotations.InterfaceAudience;
 import org.apache.pinot.annotations.InterfaceStability;
+import org.apache.pinot.common.metadata.RowMetadata;
 
 
 /**
@@ -60,11 +61,9 @@ public interface MessageBatch<T> {
   /**
    * Returns the metadata associated with the message at a particular index. 
This typically includes the timestamp
    * when the message was ingested by the upstream stream-provider and other 
relevant metadata.
-   *
    */
-  default void getMetadataAtIndex(int index, StreamMessageMetadata metadata) {
-    // update metadata as required - default implementation does nothing
-    return;
+  default RowMetadata getMetadataAtIndex(int index) {
+    return null;
   }
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
index 6022f77..6b7285b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.realtime.stream;
 
 import org.apache.pinot.annotations.InterfaceAudience;
 import org.apache.pinot.annotations.InterfaceStability;
+import org.apache.pinot.common.metadata.RowMetadata;
 import org.apache.pinot.core.data.GenericRow;
 
 
@@ -45,16 +46,6 @@ public interface StreamLevelConsumer {
   GenericRow next(GenericRow destination);
 
   /**
-   * Get next row from the stream and decode it into a generic row
-   * @param destination the decoded generic row
-   * @param metadata the metadata associated with the decoded stream if any
-   * @return the decoded generic row
-   */
-  default GenericRow next(GenericRow destination, StreamMessageMetadata 
metadata) {
-    return next(destination);
-  }
-
-  /**
    * Commit the offsets consumed so far
    * The next call to consume should exclude all events consumed before the 
commit was called, and start from newer events not yet consumed
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
index 9eef8d5..d95bb7f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
@@ -58,40 +58,4 @@ public interface StreamMessageDecoder<T> {
    * @return A new row decoded from the buffer
    */
   GenericRow decode(T payload, int offset, int length, GenericRow destination);
-
-  /**
-   * Decodes the payload received into a generic row and optionally populates 
any metadata associated
-   * with the message.
-   *
-   * This is an interface default method is provided for backward 
compatibility and should be
-   * overridden by implementations that can provide meaningful metadata.
-   *
-   * @param payload the message payload
-   * @param destination The {@link GenericRow} to write the decoded row into
-   * @param metadata The {@link StreamMessageMetadata} associated with the 
message
-   * @return A new row decoded from the buffer
-   */
-  default GenericRow decode(T payload, GenericRow destination, 
StreamMessageMetadata metadata) {
-    // by default, invoke the non-metadata based method for backwards 
compatibility
-    return decode(payload, destination);
-  }
-
-  /**
-   * Decodes the payload received into a generic row and optionally populates 
any metadata associated
-   * with the message.
-   *
-   * This is an interface default method is provided for backward 
compatibility and should be
-   * overridden by implementations that can provide meaningful metadata.
-   *
-   * @param payload the message payload
-   * @param offset The offset into the array from which the row contents starts
-   * @param length The length of the row contents in bytes
-   * @param destination The {@link GenericRow} to write the decoded row into
-   * @param metadata The {@link StreamMessageMetadata} associated with the 
message
-   * @return A new row decoded from the buffer
-   */
-  default GenericRow decode(T payload, int offset, int length, GenericRow 
destination, StreamMessageMetadata metadata) {
-    // by default, invoke the non-metadata based method for backwards 
compatibility
-    return decode(payload, offset, length, destination);
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index 30c0859..0b3ad84 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -18,33 +18,24 @@
  */
 package org.apache.pinot.core.realtime.stream;
 
-import org.apache.pinot.annotations.InterfaceAudience;
-import org.apache.pinot.annotations.InterfaceStability;
+import org.apache.pinot.common.metadata.RowMetadata;
 
 
 /**
  * A class that provides metadata associated with the message of a stream, for 
e.g.,
  * ingestion-timestamp of the message.
  */
[email protected]
[email protected]
-public class StreamMessageMetadata {
+public class StreamMessageMetadata implements RowMetadata {
 
-  private long _ingestionTimestamp = Long.MIN_VALUE;
+  private final long _ingestionTimeMs;
 
-  public StreamMessageMetadata() {
-
-  }
-
-  public long getIngestionTimestamp() {
-    return _ingestionTimestamp;
+  public StreamMessageMetadata(long ingestionTimeMs) {
+    _ingestionTimeMs = ingestionTimeMs;
   }
 
-  public void setIngestionTimestamp(long timestamp) {
-    _ingestionTimestamp = timestamp;
+  @Override
+  public long getIngestionTimeMs() {
+    return _ingestionTimeMs;
   }
 
-  public void reset() {
-    _ingestionTimestamp = Long.MIN_VALUE;
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 527464a..ceed347 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -63,7 +63,7 @@ public class MutableSegmentImplAggregateMetricsTest {
     }
 
     Map<String, Long> expectedValues = new HashMap<>();
-    StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(System.currentTimeMillis());
     Random random = new Random();
     for (int i = 0; i < NUM_ROWS; i++) {
       GenericRow row = new GenericRow();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 7e414c5..af1a0ab 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -54,6 +54,9 @@ public class MutableSegmentImplTest {
   private Schema _schema;
   private MutableSegmentImpl _mutableSegmentImpl;
   private ImmutableSegment _immutableSegment;
+  private long _lastIndexedTs;
+  private long _lastIngestionTimeMs;
+  private long _startTimeMs;
 
   @BeforeClass
   public void setUp()
@@ -74,11 +77,14 @@ public class MutableSegmentImplTest {
     _schema = config.getSchema();
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), false);
-    StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
+    _lastIngestionTimeMs = System.currentTimeMillis();
+    StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(_lastIngestionTimeMs);
+    _startTimeMs = System.currentTimeMillis();
     try (RecordReader recordReader = new AvroRecordReader(avroFile, _schema)) {
       GenericRow reuse = new GenericRow();
       while (recordReader.hasNext()) {
         _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
+        _lastIndexedTs = System.currentTimeMillis();
       }
     }
   }
@@ -89,6 +95,13 @@ public class MutableSegmentImplTest {
     SegmentMetadata expectedSegmentMetadata = 
_immutableSegment.getSegmentMetadata();
     Assert.assertEquals(actualSegmentMetadata.getTotalDocs(), 
expectedSegmentMetadata.getTotalDocs());
 
+    // assert that the last indexed timestamp is close to what we expect
+    long actualTs = 
_mutableSegmentImpl.getSegmentMetadata().getLastIndexedTimestamp();
+    Assert.assertTrue(actualTs >= _startTimeMs);
+    Assert.assertTrue(actualTs <= _lastIndexedTs);
+
+    
Assert.assertEquals(_mutableSegmentImpl.getSegmentMetadata().getLatestIngestionTimestamp(),
 _lastIngestionTimeMs);
+
     for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
       String column = fieldSpec.getName();
       DataSourceMetadata actualDataSourceMetadata = 
_mutableSegmentImpl.getDataSource(column).getDataSourceMetadata();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 87f57c54..14b0c2c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -47,6 +47,7 @@ public class LLCRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegratio
 
   public final boolean _isDirectAlloc = RANDOM.nextBoolean();
   public final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+  private final long _startTime = System.currentTimeMillis();
 
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
@@ -131,6 +132,9 @@ public class LLCRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegratio
           JsonNode queryResponse = 
postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
           // Total docs should not change during reload
           Assert.assertEquals(queryResponse.get("totalDocs").asLong(), 
numTotalDocs);
+          
Assert.assertEquals(queryResponse.get("numConsumingSegmentsQueried").asLong(), 
2);
+          
Assert.assertTrue(queryResponse.get("minConsumingFreshnessTimeMs").asLong() > 
_startTime);
+          
Assert.assertTrue(queryResponse.get("minConsumingFreshnessTimeMs").asLong() < 
System.currentTimeMillis());
           return queryResponse.get("numEntriesScannedInFilter").asLong() == 0;
         } catch (Exception e) {
           throw new RuntimeException(e);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 64640a8..7ec4af2 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -129,15 +129,12 @@ public class MemoryEstimator {
     // create mutable segment impl
     MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
 
-    StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
     // read all rows and index them
     try (PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader(_sampleCompletedSegment);) {
       GenericRow row = new GenericRow();
       while (segmentRecordReader.hasNext()) {
-        messageMetadata.reset();
-
         segmentRecordReader.next(row);
-        mutableSegmentImpl.index(row, messageMetadata);
+        mutableSegmentImpl.index(row, null);
         row.clear();
       }
     } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to