This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a1e386  Extend stream interfaces to expose message metadata like 
ingestion timestamp (#4025)
3a1e386 is described below

commit 3a1e386e4ea96d1b8629a89e065eaac274553b6e
Author: Sunitha Beeram <[email protected]>
AuthorDate: Tue Apr 9 10:49:24 2019 -0700

    Extend stream interfaces to expose message metadata like ingestion 
timestamp (#4025)
    
    * Extend stream interfaces to expose message metadata like ingestion 
timestamp
    
    * Update segment metadata interface
    
    * Expose timestamps through segment-metadata instead of IndexSegment
    
    * Fix tracking
    
    * Update MessageBatch API to take stream-metadata object
    
    * Address review comments
    
    * Address review comments
    
    * Minor fixes
    
    * Update MutableSegment interface with just one index method
    
    * Address review comments
---
 .../pinot/common/segment/SegmentMetadata.java      | 16 +++++++
 .../realtime/HLRealtimeSegmentDataManager.java     |  7 ++-
 .../realtime/LLRealtimeSegmentDataManager.java     | 11 ++++-
 .../core/indexsegment/mutable/MutableSegment.java  |  6 ++-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 33 ++++++++++++--
 .../impl/kafka/KafkaStreamLevelConsumer.java       |  8 +++-
 .../pinot/core/realtime/stream/MessageBatch.java   | 10 ++++
 .../core/realtime/stream/StreamLevelConsumer.java  | 10 ++++
 .../core/realtime/stream/StreamMessageDecoder.java | 36 +++++++++++++++
 ...velConsumer.java => StreamMessageMetadata.java} | 53 +++++++++-------------
 .../core/segment/index/SegmentMetadataImpl.java    | 14 ++++++
 .../MutableSegmentImplAggregateMetricsTest.java    |  4 +-
 .../mutable/MutableSegmentImplTest.java            |  4 +-
 .../realtime/provisioning/MemoryEstimator.java     |  6 ++-
 14 files changed, 174 insertions(+), 44 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
index 68d6ffb..9368ee2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
@@ -73,6 +73,22 @@ public interface SegmentMetadata {
    */
   long getRefreshTime();
 
+  /**
+   * Return the last time a record was indexed in this segment. Applicable for 
MutableSegments.
+   *
+   * @return time when the last record was indexed
+   */
+  long getLastIndexedTimestamp();
+
+  /**
+   * Return the latest ingestion timestamp associated with the records indexed 
in this segment.
+   * Applicable for MutableSegments.
+   *
+   * @return latest timestamp associated with indexed records
+   *         <code>Long.MIN_VALUE</code> if the stream doesn't provide a 
timestamp
+   */
+  long getLatestIngestionTimestamp();
+
   boolean hasDictionary(String columnName);
 
   boolean hasStarTree();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 745115b..a8e0bdd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,6 +51,7 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
 import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -211,15 +212,17 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
         int numRowsErrored = 0;
         GenericRow consumedRow = null;
+        StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
         do {
           try {
             consumedRow = GenericRow.createOrReuseRow(consumedRow);
-            consumedRow = _streamLevelConsumer.next(consumedRow);
+            messageMetadata.reset();
+            consumedRow = _streamLevelConsumer.next(consumedRow, 
messageMetadata);
 
             if (consumedRow != null) {
               GenericRow transformedRow = 
_recordTransformer.transform(consumedRow);
               if (transformedRow != null) {
-                notFull = realtimeSegment.index(transformedRow);
+                notFull = realtimeSegment.index(transformedRow, 
messageMetadata);
                 exceptionSleepMillis = 50L;
               }
             }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 50ee54e..9f23f89 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -67,6 +67,7 @@ import 
org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
 import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
 import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
 import org.apache.pinot.core.realtime.stream.TransientConsumerException;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
@@ -408,6 +409,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     int streamMessageCount = 0;
     boolean canTakeMore = true;
     GenericRow decodedRow = null;
+    StreamMessageMetadata msgMetadata = new StreamMessageMetadata();
+
     for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++) 
{
       if (_shouldStop || endCriteriaReached()) {
         break;
@@ -435,10 +438,14 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 
       // Index each message
       decodedRow = GenericRow.createOrReuseRow(decodedRow);
+      msgMetadata.reset();
+      // retrieve metadata from the message batch if available
+      // this can be overridden by the decoder if there is a better indicator 
in the message payload
+      messagesAndOffsets.getMetadataAtIndex(index, msgMetadata);
 
       decodedRow = _messageDecoder
           .decode(messagesAndOffsets.getMessageAtIndex(index), 
messagesAndOffsets.getMessageOffsetAtIndex(index),
-              messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow);
+              messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow, 
msgMetadata);
 
       if (decodedRow != null) {
         try {
@@ -454,7 +461,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
                     realtimeRowsDroppedMeter);
           }
 
-          canTakeMore = _realtimeSegment.index(transformedRow);
+          canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
         } catch (Exception e) {
           segmentLogger.error("Caught exception while transforming the record: 
{}", decodedRow, e);
           _numRowsErrored++;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
index 10f915b..9ab8a68 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
@@ -20,17 +20,19 @@ package org.apache.pinot.core.indexsegment.mutable;
 
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 
 
 public interface MutableSegment extends IndexSegment {
 
   /**
-   * Indexes a record into the segment.
+   * Indexes a record into the segment with optionally provided metadata.
    *
    * @param row Record represented as a {@link GenericRow}
+   * @param msgMetadata the metadata associated with the message
    * @return Whether the segment is full (i.e. cannot index more record into 
it)
    */
-  boolean index(GenericRow row);
+   boolean index(GenericRow row, StreamMessageMetadata msgMetadata);
 
   /**
    * Returns the number of records already indexed into the segment.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 1b554e8..d7abb9e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -42,6 +42,7 @@ import 
org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionary;
 import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
 import 
org.apache.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndexReader;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
@@ -94,6 +95,11 @@ public class MutableSegmentImpl implements MutableSegment {
   private volatile long _maxTime = Long.MIN_VALUE;
   private final int _numKeyColumns;
 
+  // default message metadata
+  private static final StreamMessageMetadata _defaultMetadata = new 
StreamMessageMetadata();
+  private long _lastIndexedTimestamp = Long.MIN_VALUE;
+  private long _latestIngestionTimestamp = Long.MIN_VALUE;
+
   public MutableSegmentImpl(RealtimeSegmentConfig config) {
     _segmentName = config.getSegmentName();
     _schema = config.getSchema();
@@ -109,6 +115,16 @@ public class MutableSegmentImpl implements MutableSegment {
         // In realtime total docs and total raw docs are the same currently.
         return _numDocsIndexed;
       }
+
+      @Override
+      public long getLastIndexedTimestamp() {
+        return _lastIndexedTimestamp;
+      }
+
+      @Override
+      public long getLatestIngestionTimestamp() {
+        return _latestIngestionTimestamp;
+      }
     };
 
     _offHeap = config.isOffHeap();
@@ -195,7 +211,9 @@ public class MutableSegmentImpl implements MutableSegment {
   }
 
   @Override
-  public boolean index(GenericRow row) {
+  public boolean index(GenericRow row, StreamMessageMetadata msgMetadata) {
+
+    boolean canTakeMore = false;
     // Update dictionary first
     Map<String, Object> dictIdMap = updateDictionary(row);
 
@@ -210,14 +228,23 @@ public class MutableSegmentImpl implements MutableSegment 
{
       // Add forward and inverted indices for new document.
       addForwardIndex(row, docId, dictIdMap);
       addInvertedIndex(docId, dictIdMap);
+
       // Update number of document indexed at last to make the latest record 
queryable
-      return _numDocsIndexed++ < _capacity;
+      canTakeMore = _numDocsIndexed++ < _capacity;
     } else {
       Preconditions
           .checkState(_aggregateMetrics, "Invalid document-id during indexing: 
" + docId + " expected: " + numDocs);
       // Update metrics for existing document.
-      return aggregateMetrics(row, docId);
+      canTakeMore = aggregateMetrics(row, docId);
+    }
+
+    _lastIndexedTimestamp = System.currentTimeMillis();
+
+    if (msgMetadata != null) {
+      _latestIngestionTimestamp = Math.max(_latestIngestionTimestamp, 
msgMetadata.getIngestionTimestamp());
     }
+
+    return canTakeMore;
   }
 
   private Map<String, Object> updateDictionary(GenericRow row) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
index ce2b632..d14fe62 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
 import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
 import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,9 +84,14 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
 
   @Override
   public GenericRow next(GenericRow destination) {
+    return next(destination, null);
+  }
+
+  @Override
+  public GenericRow next(GenericRow destination, StreamMessageMetadata 
metadata) {
     if (kafkaIterator.hasNext()) {
       try {
-        destination = _messageDecoder.decode(kafkaIterator.next().message(), 
destination);
+        destination = _messageDecoder.decode(kafkaIterator.next().message(), 
destination, metadata);
         tableAndStreamRowsConsumed = _serverMetrics
             .addMeteredTableValue(_tableAndStreamName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
                 tableAndStreamRowsConsumed);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
index 86c722b..a575cff 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
@@ -58,6 +58,16 @@ public interface MessageBatch<T> {
   int getMessageLengthAtIndex(int index);
 
   /**
+   * Returns the metadata associated with the message at a particular index. 
This typically includes the timestamp
+   * when the message was ingested by the upstream stream-provider and other 
relevant metadata.
+   *
+   */
+  default void getMetadataAtIndex(int index, StreamMessageMetadata metadata) {
+    // update metadata as required - default implementation does nothing
+    return;
+  }
+
+  /**
    * Returns the offset of the next message.
    * @param index
    * @return
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
index fc8cc19..6022f77 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
@@ -45,6 +45,16 @@ public interface StreamLevelConsumer {
   GenericRow next(GenericRow destination);
 
   /**
+   * Get next row from the stream and decode it into a generic row
+   * @param destination the decoded generic row
+   * @param metadata the metadata associated with the decoded stream if any
+   * @return the decoded generic row
+   */
+  default GenericRow next(GenericRow destination, StreamMessageMetadata 
metadata) {
+    return next(destination);
+  }
+
+  /**
    * Commit the offsets consumed so far
    * The next call to consume should exclude all events consumed before the 
commit was called, and start from newer events not yet consumed
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
index d95bb7f..9eef8d5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
@@ -58,4 +58,40 @@ public interface StreamMessageDecoder<T> {
    * @return A new row decoded from the buffer
    */
   GenericRow decode(T payload, int offset, int length, GenericRow destination);
+
+  /**
+   * Decodes the payload received into a generic row and optionally populates 
any metadata associated
+   * with the message.
+   *
+   * This is an interface default method is provided for backward 
compatibility and should be
+   * overridden by implementations that can provide meaningful metadata.
+   *
+   * @param payload the message payload
+   * @param destination The {@link GenericRow} to write the decoded row into
+   * @param metadata The {@link StreamMessageMetadata} associated with the 
message
+   * @return A new row decoded from the buffer
+   */
+  default GenericRow decode(T payload, GenericRow destination, 
StreamMessageMetadata metadata) {
+    // by default, invoke the non-metadata based method for backwards 
compatibility
+    return decode(payload, destination);
+  }
+
+  /**
+   * Decodes the payload received into a generic row and optionally populates 
any metadata associated
+   * with the message.
+   *
+   * This is an interface default method is provided for backward 
compatibility and should be
+   * overridden by implementations that can provide meaningful metadata.
+   *
+   * @param payload the message payload
+   * @param offset The offset into the array from which the row contents starts
+   * @param length The length of the row contents in bytes
+   * @param destination The {@link GenericRow} to write the decoded row into
+   * @param metadata The {@link StreamMessageMetadata} associated with the 
message
+   * @return A new row decoded from the buffer
+   */
+  default GenericRow decode(T payload, int offset, int length, GenericRow 
destination, StreamMessageMetadata metadata) {
+    // by default, invoke the non-metadata based method for backwards 
compatibility
+    return decode(payload, offset, length, destination);
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
similarity index 55%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index fc8cc19..30c0859 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -20,40 +20,31 @@ package org.apache.pinot.core.realtime.stream;
 
 import org.apache.pinot.annotations.InterfaceAudience;
 import org.apache.pinot.annotations.InterfaceStability;
-import org.apache.pinot.core.data.GenericRow;
 
 
 /**
- * Interface for a consumer that consumes at stream level and is unaware of 
any partitions of the stream
+ * A class that provides metadata associated with the message of a stream, for 
e.g.,
+ * ingestion-timestamp of the message.
  */
 @InterfaceAudience.Public
[email protected]
-public interface StreamLevelConsumer {
-
-  /**
-   * Initialize and start the stream level consumer
-   * @throws Exception
-   */
-  void start()
-      throws Exception;
-
-  /**
-   * Get next row from the stream and decode it into a generic row
-   * @param destination
-   * @return
-   */
-  GenericRow next(GenericRow destination);
-
-  /**
-   * Commit the offsets consumed so far
-   * The next call to consume should exclude all events consumed before the 
commit was called, and start from newer events not yet consumed
-   */
-  void commit();
-
-  /**
-   * Shutdown the stream consumer
-   * @throws Exception
-   */
-  void shutdown()
-      throws Exception;
[email protected]
+public class StreamMessageMetadata {
+
+  private long _ingestionTimestamp = Long.MIN_VALUE;
+
+  public StreamMessageMetadata() {
+
+  }
+
+  public long getIngestionTimestamp() {
+    return _ingestionTimestamp;
+  }
+
+  public void setIngestionTimestamp(long timestamp) {
+    _ingestionTimestamp = timestamp;
+  }
+
+  public void reset() {
+    _ingestionTimestamp = Long.MIN_VALUE;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
index d731e2f..bcdca10 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
@@ -83,6 +83,10 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private Duration _timeGranularity;
   private long _pushTime = Long.MIN_VALUE;
   private long _refreshTime = Long.MIN_VALUE;
+
+  private long _lastIndexedTime = Long.MIN_VALUE;
+  private long _latestIngestionTime = Long.MIN_VALUE;
+
   private SegmentVersion _segmentVersion;
   private boolean _hasStarTree;
   private StarTreeMetadata _starTreeMetadata;
@@ -456,6 +460,16 @@ public class SegmentMetadataImpl implements 
SegmentMetadata {
   }
 
   @Override
+  public long getLastIndexedTimestamp() {
+    return _lastIndexedTime;
+  }
+
+  @Override
+  public long getLatestIngestionTimestamp() {
+    return _latestIngestionTime;
+  }
+
+  @Override
   public boolean hasDictionary(String columnName) {
     return _columnMetadataMap.get(columnName).hasDictionary();
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 9357fa0..527464a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -62,6 +63,7 @@ public class MutableSegmentImplAggregateMetricsTest {
     }
 
     Map<String, Long> expectedValues = new HashMap<>();
+    StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
     Random random = new Random();
     for (int i = 0; i < NUM_ROWS; i++) {
       GenericRow row = new GenericRow();
@@ -71,7 +73,7 @@ public class MutableSegmentImplAggregateMetricsTest {
       long metricValue = random.nextInt();
       row.putField(METRIC, metricValue);
 
-      _mutableSegmentImpl.index(row);
+      _mutableSegmentImpl.index(row, defaultMetadata);
 
       // Update expected values
       String key = buildKey(row);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 912e6cf..7e414c5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
@@ -73,10 +74,11 @@ public class MutableSegmentImplTest {
     _schema = config.getSchema();
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), false);
+    StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
     try (RecordReader recordReader = new AvroRecordReader(avroFile, _schema)) {
       GenericRow reuse = new GenericRow();
       while (recordReader.hasNext()) {
-        _mutableSegmentImpl.index(recordReader.next(reuse));
+        _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
       }
     }
   }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 1832c6c..64640a8 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -36,6 +36,7 @@ import 
org.apache.pinot.core.io.readerwriter.RealtimeIndexOffHeapMemoryManager;
 import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 
 
@@ -128,12 +129,15 @@ public class MemoryEstimator {
     // create mutable segment impl
     MutableSegmentImpl mutableSegmentImpl = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
 
+    StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
     // read all rows and index them
     try (PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader(_sampleCompletedSegment);) {
       GenericRow row = new GenericRow();
       while (segmentRecordReader.hasNext()) {
+        messageMetadata.reset();
+
         segmentRecordReader.next(row);
-        mutableSegmentImpl.index(row);
+        mutableSegmentImpl.index(row, messageMetadata);
         row.clear();
       }
     } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to