This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3a1e386 Extend stream interfaces to expose message metadata like
ingestion timestamp (#4025)
3a1e386 is described below
commit 3a1e386e4ea96d1b8629a89e065eaac274553b6e
Author: Sunitha Beeram <[email protected]>
AuthorDate: Tue Apr 9 10:49:24 2019 -0700
Extend stream interfaces to expose message metadata like ingestion
timestamp (#4025)
* Extend stream interfaces to expose message metadata like ingestion
timestamp
* Update segment metadata interface
* Expose timestamps through segment-metadata instead of IndexSegment
* Fix tracking
* Update MessageBatch API to take stream-metadata object
* Address review comments
* Address review comments
* Minor fixes
* Update MutableSegment interface with just one index method
* Address review comments
---
.../pinot/common/segment/SegmentMetadata.java | 16 +++++++
.../realtime/HLRealtimeSegmentDataManager.java | 7 ++-
.../realtime/LLRealtimeSegmentDataManager.java | 11 ++++-
.../core/indexsegment/mutable/MutableSegment.java | 6 ++-
.../indexsegment/mutable/MutableSegmentImpl.java | 33 ++++++++++++--
.../impl/kafka/KafkaStreamLevelConsumer.java | 8 +++-
.../pinot/core/realtime/stream/MessageBatch.java | 10 ++++
.../core/realtime/stream/StreamLevelConsumer.java | 10 ++++
.../core/realtime/stream/StreamMessageDecoder.java | 36 +++++++++++++++
...velConsumer.java => StreamMessageMetadata.java} | 53 +++++++++-------------
.../core/segment/index/SegmentMetadataImpl.java | 14 ++++++
.../MutableSegmentImplAggregateMetricsTest.java | 4 +-
.../mutable/MutableSegmentImplTest.java | 4 +-
.../realtime/provisioning/MemoryEstimator.java | 6 ++-
14 files changed, 174 insertions(+), 44 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
index 68d6ffb..9368ee2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/segment/SegmentMetadata.java
@@ -73,6 +73,22 @@ public interface SegmentMetadata {
*/
long getRefreshTime();
+ /**
+ * Return the last time a record was indexed in this segment. Applicable for
MutableSegments.
+ *
+ * @return time when the last record was indexed
+ */
+ long getLastIndexedTimestamp();
+
+ /**
+ * Return the latest ingestion timestamp associated with the records indexed
in this segment.
+ * Applicable for MutableSegments.
+ *
+ * @return latest timestamp associated with indexed records
+ * <code>Long.MIN_VALUE</code> if the stream doesn't provide a
timestamp
+ */
+ long getLatestIngestionTimestamp();
+
boolean hasDictionary(String columnName);
boolean hasStarTree();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 745115b..a8e0bdd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,6 +51,7 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -211,15 +212,17 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
int numRowsErrored = 0;
GenericRow consumedRow = null;
+ StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
do {
try {
consumedRow = GenericRow.createOrReuseRow(consumedRow);
- consumedRow = _streamLevelConsumer.next(consumedRow);
+ messageMetadata.reset();
+ consumedRow = _streamLevelConsumer.next(consumedRow,
messageMetadata);
if (consumedRow != null) {
GenericRow transformedRow =
_recordTransformer.transform(consumedRow);
if (transformedRow != null) {
- notFull = realtimeSegment.index(transformedRow);
+ notFull = realtimeSegment.index(transformedRow,
messageMetadata);
exceptionSleepMillis = 50L;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 50ee54e..9f23f89 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -67,6 +67,7 @@ import
org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
import org.apache.pinot.core.realtime.stream.TransientConsumerException;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
@@ -408,6 +409,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
int streamMessageCount = 0;
boolean canTakeMore = true;
GenericRow decodedRow = null;
+ StreamMessageMetadata msgMetadata = new StreamMessageMetadata();
+
for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++)
{
if (_shouldStop || endCriteriaReached()) {
break;
@@ -435,10 +438,14 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Index each message
decodedRow = GenericRow.createOrReuseRow(decodedRow);
+ msgMetadata.reset();
+ // retrieve metadata from the message batch if available
+ // this can be overridden by the decoder if there is a better indicator
in the message payload
+ messagesAndOffsets.getMetadataAtIndex(index, msgMetadata);
decodedRow = _messageDecoder
.decode(messagesAndOffsets.getMessageAtIndex(index),
messagesAndOffsets.getMessageOffsetAtIndex(index),
- messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow);
+ messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow,
msgMetadata);
if (decodedRow != null) {
try {
@@ -454,7 +461,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
realtimeRowsDroppedMeter);
}
- canTakeMore = _realtimeSegment.index(transformedRow);
+ canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
} catch (Exception e) {
segmentLogger.error("Caught exception while transforming the record:
{}", decodedRow, e);
_numRowsErrored++;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
index 10f915b..9ab8a68 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
@@ -20,17 +20,19 @@ package org.apache.pinot.core.indexsegment.mutable;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
public interface MutableSegment extends IndexSegment {
/**
- * Indexes a record into the segment.
+ * Indexes a record into the segment with optionally provided metadata.
*
* @param row Record represented as a {@link GenericRow}
+ * @param msgMetadata the metadata associated with the message
* @return Whether the segment is full (i.e. cannot index more record into
it)
*/
- boolean index(GenericRow row);
+ boolean index(GenericRow row, StreamMessageMetadata msgMetadata);
/**
* Returns the number of records already indexed into the segment.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 1b554e8..d7abb9e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -42,6 +42,7 @@ import
org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionary;
import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
import
org.apache.pinot.core.realtime.impl.invertedindex.RealtimeInvertedIndexReader;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
@@ -94,6 +95,11 @@ public class MutableSegmentImpl implements MutableSegment {
private volatile long _maxTime = Long.MIN_VALUE;
private final int _numKeyColumns;
+ // default message metadata
+ private static final StreamMessageMetadata _defaultMetadata = new
StreamMessageMetadata();
+ private long _lastIndexedTimestamp = Long.MIN_VALUE;
+ private long _latestIngestionTimestamp = Long.MIN_VALUE;
+
public MutableSegmentImpl(RealtimeSegmentConfig config) {
_segmentName = config.getSegmentName();
_schema = config.getSchema();
@@ -109,6 +115,16 @@ public class MutableSegmentImpl implements MutableSegment {
// In realtime total docs and total raw docs are the same currently.
return _numDocsIndexed;
}
+
+ @Override
+ public long getLastIndexedTimestamp() {
+ return _lastIndexedTimestamp;
+ }
+
+ @Override
+ public long getLatestIngestionTimestamp() {
+ return _latestIngestionTimestamp;
+ }
};
_offHeap = config.isOffHeap();
@@ -195,7 +211,9 @@ public class MutableSegmentImpl implements MutableSegment {
}
@Override
- public boolean index(GenericRow row) {
+ public boolean index(GenericRow row, StreamMessageMetadata msgMetadata) {
+
+ boolean canTakeMore = false;
// Update dictionary first
Map<String, Object> dictIdMap = updateDictionary(row);
@@ -210,14 +228,23 @@ public class MutableSegmentImpl implements MutableSegment
{
// Add forward and inverted indices for new document.
addForwardIndex(row, docId, dictIdMap);
addInvertedIndex(docId, dictIdMap);
+
// Update number of document indexed at last to make the latest record
queryable
- return _numDocsIndexed++ < _capacity;
+ canTakeMore = _numDocsIndexed++ < _capacity;
} else {
Preconditions
.checkState(_aggregateMetrics, "Invalid document-id during indexing:
" + docId + " expected: " + numDocs);
// Update metrics for existing document.
- return aggregateMetrics(row, docId);
+ canTakeMore = aggregateMetrics(row, docId);
+ }
+
+ _lastIndexedTimestamp = System.currentTimeMillis();
+
+ if (msgMetadata != null) {
+ _latestIngestionTimestamp = Math.max(_latestIngestionTimestamp,
msgMetadata.getIngestionTimestamp());
}
+
+ return canTakeMore;
}
private Map<String, Object> updateDictionary(GenericRow row) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
index ce2b632..d14fe62 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,9 +84,14 @@ public class KafkaStreamLevelConsumer implements
StreamLevelConsumer {
@Override
public GenericRow next(GenericRow destination) {
+ return next(destination, null);
+ }
+
+ @Override
+ public GenericRow next(GenericRow destination, StreamMessageMetadata
metadata) {
if (kafkaIterator.hasNext()) {
try {
- destination = _messageDecoder.decode(kafkaIterator.next().message(),
destination);
+ destination = _messageDecoder.decode(kafkaIterator.next().message(),
destination, metadata);
tableAndStreamRowsConsumed = _serverMetrics
.addMeteredTableValue(_tableAndStreamName,
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
tableAndStreamRowsConsumed);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
index 86c722b..a575cff 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
@@ -58,6 +58,16 @@ public interface MessageBatch<T> {
int getMessageLengthAtIndex(int index);
/**
+ * Returns the metadata associated with the message at a particular index.
This typically includes the timestamp
+ * when the message was ingested by the upstream stream-provider and other
relevant metadata.
+ *
+ */
+ default void getMetadataAtIndex(int index, StreamMessageMetadata metadata) {
+ // update metadata as required - default implementation does nothing
+ return;
+ }
+
+ /**
* Returns the offset of the next message.
* @param index
* @return
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
index fc8cc19..6022f77 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
@@ -45,6 +45,16 @@ public interface StreamLevelConsumer {
GenericRow next(GenericRow destination);
/**
+ * Get next row from the stream and decode it into a generic row
+ * @param destination the decoded generic row
+ * @param metadata the metadata associated with the decoded stream if any
+ * @return the decoded generic row
+ */
+ default GenericRow next(GenericRow destination, StreamMessageMetadata
metadata) {
+ return next(destination);
+ }
+
+ /**
* Commit the offsets consumed so far
* The next call to consume should exclude all events consumed before the
commit was called, and start from newer events not yet consumed
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
index d95bb7f..9eef8d5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
@@ -58,4 +58,40 @@ public interface StreamMessageDecoder<T> {
* @return A new row decoded from the buffer
*/
GenericRow decode(T payload, int offset, int length, GenericRow destination);
+
+ /**
+ * Decodes the payload received into a generic row and optionally populates
any metadata associated
+ * with the message.
+ *
+ * This is an interface default method is provided for backward
compatibility and should be
+ * overridden by implementations that can provide meaningful metadata.
+ *
+ * @param payload the message payload
+ * @param destination The {@link GenericRow} to write the decoded row into
+ * @param metadata The {@link StreamMessageMetadata} associated with the
message
+ * @return A new row decoded from the buffer
+ */
+ default GenericRow decode(T payload, GenericRow destination,
StreamMessageMetadata metadata) {
+ // by default, invoke the non-metadata based method for backwards
compatibility
+ return decode(payload, destination);
+ }
+
+ /**
+ * Decodes the payload received into a generic row and optionally populates
any metadata associated
+ * with the message.
+ *
+ * This is an interface default method is provided for backward
compatibility and should be
+ * overridden by implementations that can provide meaningful metadata.
+ *
+ * @param payload the message payload
+ * @param offset The offset into the array from which the row contents starts
+ * @param length The length of the row contents in bytes
+ * @param destination The {@link GenericRow} to write the decoded row into
+ * @param metadata The {@link StreamMessageMetadata} associated with the
message
+ * @return A new row decoded from the buffer
+ */
+ default GenericRow decode(T payload, int offset, int length, GenericRow
destination, StreamMessageMetadata metadata) {
+ // by default, invoke the non-metadata based method for backwards
compatibility
+ return decode(payload, offset, length, destination);
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
similarity index 55%
copy from
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
copy to
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index fc8cc19..30c0859 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -20,40 +20,31 @@ package org.apache.pinot.core.realtime.stream;
import org.apache.pinot.annotations.InterfaceAudience;
import org.apache.pinot.annotations.InterfaceStability;
-import org.apache.pinot.core.data.GenericRow;
/**
- * Interface for a consumer that consumes at stream level and is unaware of
any partitions of the stream
+ * A class that provides metadata associated with the message of a stream, for
e.g.,
+ * ingestion-timestamp of the message.
*/
@InterfaceAudience.Public
[email protected]
-public interface StreamLevelConsumer {
-
- /**
- * Initialize and start the stream level consumer
- * @throws Exception
- */
- void start()
- throws Exception;
-
- /**
- * Get next row from the stream and decode it into a generic row
- * @param destination
- * @return
- */
- GenericRow next(GenericRow destination);
-
- /**
- * Commit the offsets consumed so far
- * The next call to consume should exclude all events consumed before the
commit was called, and start from newer events not yet consumed
- */
- void commit();
-
- /**
- * Shutdown the stream consumer
- * @throws Exception
- */
- void shutdown()
- throws Exception;
[email protected]
+public class StreamMessageMetadata {
+
+ private long _ingestionTimestamp = Long.MIN_VALUE;
+
+ public StreamMessageMetadata() {
+
+ }
+
+ public long getIngestionTimestamp() {
+ return _ingestionTimestamp;
+ }
+
+ public void setIngestionTimestamp(long timestamp) {
+ _ingestionTimestamp = timestamp;
+ }
+
+ public void reset() {
+ _ingestionTimestamp = Long.MIN_VALUE;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
index d731e2f..bcdca10 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
@@ -83,6 +83,10 @@ public class SegmentMetadataImpl implements SegmentMetadata {
private Duration _timeGranularity;
private long _pushTime = Long.MIN_VALUE;
private long _refreshTime = Long.MIN_VALUE;
+
+ private long _lastIndexedTime = Long.MIN_VALUE;
+ private long _latestIngestionTime = Long.MIN_VALUE;
+
private SegmentVersion _segmentVersion;
private boolean _hasStarTree;
private StarTreeMetadata _starTreeMetadata;
@@ -456,6 +460,16 @@ public class SegmentMetadataImpl implements
SegmentMetadata {
}
@Override
+ public long getLastIndexedTimestamp() {
+ return _lastIndexedTime;
+ }
+
+ @Override
+ public long getLatestIngestionTimestamp() {
+ return _latestIngestionTime;
+ }
+
+ @Override
public boolean hasDictionary(String columnName) {
return _columnMetadataMap.get(columnName).hasDictionary();
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 9357fa0..527464a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -62,6 +63,7 @@ public class MutableSegmentImplAggregateMetricsTest {
}
Map<String, Long> expectedValues = new HashMap<>();
+ StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
Random random = new Random();
for (int i = 0; i < NUM_ROWS; i++) {
GenericRow row = new GenericRow();
@@ -71,7 +73,7 @@ public class MutableSegmentImplAggregateMetricsTest {
long metricValue = random.nextInt();
row.putField(METRIC, metricValue);
- _mutableSegmentImpl.index(row);
+ _mutableSegmentImpl.index(row, defaultMetadata);
// Update expected values
String key = buildKey(row);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 912e6cf..7e414c5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.readers.Dictionary;
@@ -73,10 +74,11 @@ public class MutableSegmentImplTest {
_schema = config.getSchema();
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(),
Collections.emptySet(), false);
+ StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
try (RecordReader recordReader = new AvroRecordReader(avroFile, _schema)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
- _mutableSegmentImpl.index(recordReader.next(reuse));
+ _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
}
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 1832c6c..64640a8 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -36,6 +36,7 @@ import
org.apache.pinot.core.io.readerwriter.RealtimeIndexOffHeapMemoryManager;
import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -128,12 +129,15 @@ public class MemoryEstimator {
// create mutable segment impl
MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+ StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
// read all rows and index them
try (PinotSegmentRecordReader segmentRecordReader = new
PinotSegmentRecordReader(_sampleCompletedSegment);) {
GenericRow row = new GenericRow();
while (segmentRecordReader.hasNext()) {
+ messageMetadata.reset();
+
segmentRecordReader.next(row);
- mutableSegmentImpl.index(row);
+ mutableSegmentImpl.index(row, messageMetadata);
row.clear();
}
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]