This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 18d628b Track "freshness" timestamp across consuming segments (#3979)
18d628b is described below
commit 18d628b8fda8cf27c58854b64b8d4ee177c5334b
Author: Sunitha Beeram <[email protected]>
AuthorDate: Wed May 8 21:47:58 2019 -0700
Track "freshness" timestamp across consuming segments (#3979)
* Track Indexed timestamp across consuming segments
* Fix when indexing time is updated
* Update based on PR#4025
* Fix tests
* Simplify interfaces
* Update interface annotation
* Address review comments
* Fix names in BrokerResponseNative; Rely on instanceof
* Address review comments
* Fix name
* Fix test
---
.../apache/pinot/common/metadata/RowMetadata.java | 30 ++++++------------
.../response/broker/BrokerResponseNative.java | 27 +++++++++++++++-
.../org/apache/pinot/common/utils/DataTable.java | 2 ++
.../realtime/HLRealtimeSegmentDataManager.java | 7 ++--
.../realtime/LLRealtimeSegmentDataManager.java | 7 ++--
.../core/indexsegment/mutable/MutableSegment.java | 7 ++--
.../indexsegment/mutable/MutableSegmentImpl.java | 23 +++++++-------
.../query/executor/ServerQueryExecutorV1Impl.java | 37 +++++++++++++++++++++-
.../core/query/reduce/BrokerReduceService.java | 19 +++++++++++
.../impl/kafka/KafkaStreamLevelConsumer.java | 7 ++--
.../pinot/core/realtime/stream/MessageBatch.java | 7 ++--
.../core/realtime/stream/StreamLevelConsumer.java | 11 +------
.../core/realtime/stream/StreamMessageDecoder.java | 36 ---------------------
.../realtime/stream/StreamMessageMetadata.java | 25 +++++----------
.../MutableSegmentImplAggregateMetricsTest.java | 2 +-
.../mutable/MutableSegmentImplTest.java | 15 ++++++++-
.../tests/LLCRealtimeClusterIntegrationTest.java | 4 +++
.../realtime/provisioning/MemoryEstimator.java | 5 +--
18 files changed, 148 insertions(+), 123 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
similarity index 64%
copy from
pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
copy to
pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
index 30c0859..4e67a05 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/RowMetadata.java
@@ -16,35 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.realtime.stream;
+package org.apache.pinot.common.metadata;
import org.apache.pinot.annotations.InterfaceAudience;
import org.apache.pinot.annotations.InterfaceStability;
/**
- * A class that provides metadata associated with the message of a stream, for
e.g.,
- * ingestion-timestamp of the message.
+ * A class that provides relevant row-level metadata for rows ingested into a
segment.
+ *
+ * Currently this is relevant for rows ingested into a mutable segment.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class StreamMessageMetadata {
-
- private long _ingestionTimestamp = Long.MIN_VALUE;
-
- public StreamMessageMetadata() {
-
- }
-
- public long getIngestionTimestamp() {
- return _ingestionTimestamp;
- }
-
- public void setIngestionTimestamp(long timestamp) {
- _ingestionTimestamp = timestamp;
- }
+public interface RowMetadata {
- public void reset() {
- _ingestionTimestamp = Long.MIN_VALUE;
- }
+ /**
+ * Return the ingestion timestamp of the row.
+ */
+ long getIngestionTimeMs();
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 94c6d46..214d942 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -39,7 +39,7 @@ import org.apache.pinot.common.utils.JsonUtils;
*
* Supports serialization via JSON.
*/
-@JsonPropertyOrder({"selectionResults", "aggregationResults", "exceptions",
"numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numDocsScanned",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numGroupsLimitReached", "totalDocs", "timeUsedMs", "segmentStatistics",
"traceInfo"})
+@JsonPropertyOrder({"selectionResults", "aggregationResults", "exceptions",
"numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
"numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numGroupsLimitReached", "totalDocs", "timeUsedMs", "segmentStatistics",
"traceInfo"})
public class BrokerResponseNative implements BrokerResponse {
public static final BrokerResponseNative EMPTY_RESULT =
BrokerResponseNative.empty();
public static final BrokerResponseNative NO_TABLE_RESULT =
@@ -53,6 +53,10 @@ public class BrokerResponseNative implements BrokerResponse {
private long _numSegmentsQueried = 0L;
private long _numSegmentsProcessed = 0L;
private long _numSegmentsMatched = 0L;
+ private long _numConsumingSegmentsQueried = 0L;
+ // the timestamp indicating the freshness of the data queried in consuming
segments.
+ // This can be ingestion timestamp if provided by the stream, or the last
index time
+ private long _minConsumingFreshnessTimeMs = 0L;
private long _totalDocs = 0L;
private boolean _numGroupsLimitReached = false;
@@ -203,6 +207,27 @@ public class BrokerResponseNative implements
BrokerResponse {
_numSegmentsMatched = numSegmentsMatched;
}
+ @JsonProperty("numConsumingSegmentsQueried")
+ public long getNumConsumingSegmentsQueried() {
+ return _numConsumingSegmentsQueried;
+ }
+
+ @JsonProperty("numConsumingSegmentsQueried")
+ public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried)
{
+ _numConsumingSegmentsQueried = numConsumingSegmentsQueried;
+ }
+
+ @JsonProperty("minConsumingFreshnessTimeMs")
+ public long getMinConsumingFreshnessTimeMs() {
+ return _minConsumingFreshnessTimeMs;
+ }
+
+ @JsonProperty("minConsumingFreshnessTimeMs")
+ public void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs)
{
+ _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
+ }
+
+
@JsonProperty("totalDocs")
@Override
public long getTotalDocs() {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 1a6c49d..5383a3c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -35,6 +35,8 @@ public interface DataTable {
String NUM_SEGMENTS_QUERIED = "numSegmentsQueried";
String NUM_SEGMENTS_PROCESSED = "numSegmentsProcessed";
String NUM_SEGMENTS_MATCHED = "numSegmentsMatched";
+ String NUM_CONSUMING_SEGMENTS_QUERIED = "numConsumingSegmentsQueried";
+ String MIN_CONSUMING_FRESHNESS_TIME_MS = "minConsumingFreshnessTimeMs";
String TOTAL_DOCS_METADATA_KEY = "totalDocs";
String NUM_GROUPS_LIMIT_REACHED_KEY = "numGroupsLimitReached";
String TIME_USED_MS_METADATA_KEY = "timeUsedMs";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index a8e0bdd..45e0128 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -212,17 +212,16 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
int numRowsErrored = 0;
GenericRow consumedRow = null;
- StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
do {
try {
consumedRow = GenericRow.createOrReuseRow(consumedRow);
- messageMetadata.reset();
- consumedRow = _streamLevelConsumer.next(consumedRow,
messageMetadata);
+ consumedRow = _streamLevelConsumer.next(consumedRow);
if (consumedRow != null) {
GenericRow transformedRow =
_recordTransformer.transform(consumedRow);
if (transformedRow != null) {
- notFull = realtimeSegment.index(transformedRow,
messageMetadata);
+ // we currently do not get ingestion data through
stream-consumer
+ notFull = realtimeSegment.index(transformedRow, null);
exceptionSleepMillis = 50L;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 9f23f89..7fcbd9c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.StarTreeIndexSpec;
+import org.apache.pinot.common.metadata.RowMetadata;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -409,7 +410,6 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
int streamMessageCount = 0;
boolean canTakeMore = true;
GenericRow decodedRow = null;
- StreamMessageMetadata msgMetadata = new StreamMessageMetadata();
for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++)
{
if (_shouldStop || endCriteriaReached()) {
@@ -438,14 +438,13 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Index each message
decodedRow = GenericRow.createOrReuseRow(decodedRow);
- msgMetadata.reset();
// retrieve metadata from the message batch if available
// this can be overridden by the decoder if there is a better indicator
in the message payload
- messagesAndOffsets.getMetadataAtIndex(index, msgMetadata);
+ RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
decodedRow = _messageDecoder
.decode(messagesAndOffsets.getMessageAtIndex(index),
messagesAndOffsets.getMessageOffsetAtIndex(index),
- messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow,
msgMetadata);
+ messagesAndOffsets.getMessageLengthAtIndex(index), decodedRow);
if (decodedRow != null) {
try {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
index 9ab8a68..316f265 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegment.java
@@ -18,9 +18,10 @@
*/
package org.apache.pinot.core.indexsegment.mutable;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.RowMetadata;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
public interface MutableSegment extends IndexSegment {
@@ -29,10 +30,10 @@ public interface MutableSegment extends IndexSegment {
* Indexes a record into the segment with optionally provided metadata.
*
* @param row Record represented as a {@link GenericRow}
- * @param msgMetadata the metadata associated with the message
+ * @param rowMetadata the metadata associated with the message
* @return Whether the segment is full (i.e. cannot index more record into
it)
*/
- boolean index(GenericRow row, StreamMessageMetadata msgMetadata);
+ boolean index(GenericRow row, @Nullable RowMetadata rowMetadata);
/**
* Returns the number of records already indexed into the segment.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index d7abb9e..a92ecfc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -26,9 +26,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.RowMetadata;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.core.data.GenericRow;
@@ -96,9 +98,9 @@ public class MutableSegmentImpl implements MutableSegment {
private final int _numKeyColumns;
// default message metadata
- private static final StreamMessageMetadata _defaultMetadata = new
StreamMessageMetadata();
- private long _lastIndexedTimestamp = Long.MIN_VALUE;
- private long _latestIngestionTimestamp = Long.MIN_VALUE;
+ private static final StreamMessageMetadata _defaultMetadata = new
StreamMessageMetadata(System.currentTimeMillis());
+ private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
+ private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;
public MutableSegmentImpl(RealtimeSegmentConfig config) {
_segmentName = config.getSegmentName();
@@ -118,12 +120,12 @@ public class MutableSegmentImpl implements MutableSegment
{
@Override
public long getLastIndexedTimestamp() {
- return _lastIndexedTimestamp;
+ return _lastIndexedTimeMs;
}
@Override
public long getLatestIngestionTimestamp() {
- return _latestIngestionTimestamp;
+ return _latestIngestionTimeMs;
}
};
@@ -211,9 +213,9 @@ public class MutableSegmentImpl implements MutableSegment {
}
@Override
- public boolean index(GenericRow row, StreamMessageMetadata msgMetadata) {
+ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
- boolean canTakeMore = false;
+ boolean canTakeMore;
// Update dictionary first
Map<String, Object> dictIdMap = updateDictionary(row);
@@ -238,12 +240,11 @@ public class MutableSegmentImpl implements MutableSegment
{
canTakeMore = aggregateMetrics(row, docId);
}
- _lastIndexedTimestamp = System.currentTimeMillis();
+ _lastIndexedTimeMs = System.currentTimeMillis();
- if (msgMetadata != null) {
- _latestIngestionTimestamp = Math.max(_latestIngestionTimestamp,
msgMetadata.getIngestionTimestamp());
+ if (rowMetadata != null) {
+ _latestIngestionTimeMs = Math.max(_latestIngestionTimeMs,
rowMetadata.getIngestionTimeMs());
}
-
return canTakeMore;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 4ab000c..3058539 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
@@ -41,6 +42,7 @@ import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.plan.maker.PlanMaker;
@@ -149,6 +151,34 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
TraceContext.register(requestId);
}
+ int numConsumingSegmentsQueried = 0;
+ long minIndexTimeMs = Long.MAX_VALUE;
+ long minIngestionTimeMs = Long.MAX_VALUE;
+ // gather stats for realtime consuming segments
+ for (SegmentDataManager segmentMgr : segmentDataManagers) {
+ if (segmentMgr.getSegment() instanceof MutableSegment) {
+ numConsumingSegmentsQueried += 1;
+ SegmentMetadata metadata =
segmentMgr.getSegment().getSegmentMetadata();
+ long indexedTime = metadata.getLastIndexedTimestamp();
+ if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) {
+ minIndexTimeMs = metadata.getLastIndexedTimestamp();
+ }
+ long ingestionTime = metadata.getLatestIngestionTimestamp();
+ if (ingestionTime != Long.MIN_VALUE && ingestionTime <
minIngestionTimeMs) {
+ minIngestionTimeMs = ingestionTime;
+ }
+ }
+ }
+
+ long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+ if (numConsumingSegmentsQueried > 0) {
+ if (minIngestionTimeMs == Long.MAX_VALUE) {
+ LOGGER.debug("Did not find valid ingestionTimestamp across consuming
segments! Using indexTime instead");
+ minConsumingFreshnessTimeMs = minIndexTimeMs;
+ }
+ LOGGER.debug("Querying {} consuming segments with min
minConsumingFreshnessTimeMs {}", numConsumingSegmentsQueried,
minConsumingFreshnessTimeMs);
+ }
+
DataTable dataTable = null;
try {
TimerContext.Timer segmentPruneTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
@@ -219,13 +249,18 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
// Currently, given the deleted segments cache is in-memory only, a
server restart will reset it
// We might end up sending partial-response metadata in such cases. It
appears that the likelihood of
// this occurence is low; ie, segment has to be retained out and the
server must be restarted while the
- // broker view is still behind. We would however like to validate that
and/or conf control this based on
+ // broker view is still behind. We would however like to validate that
and/or conf control this based on
// data.
/*dataTable.addException(QueryException.getException(QueryException.SEGMENTS_MISSING_ERROR,
"Could not find " + missingSegments + " segments on the server"));*/
_serverMetrics.addMeteredTableValue(tableNameWithType,
ServerMeter.NUM_MISSING_SEGMENTS, missingSegments);
}
+ if (numConsumingSegmentsQueried > 0) {
+ dataTable.getMetadata().put(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED,
Integer.toString(numConsumingSegmentsQueried));
+ dataTable.getMetadata().put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS,
Long.toString(minConsumingFreshnessTimeMs));
+ }
+
LOGGER.debug("Query processing time for request Id - {}: {}", requestId,
queryProcessingTime);
LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId,
dataTable);
return dataTable;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index a31ad30..d539bf7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -82,6 +82,8 @@ public class BrokerReduceService implements
ReduceService<BrokerResponseNative>
long numSegmentsQueried = 0L;
long numSegmentsProcessed = 0L;
long numSegmentsMatched = 0L;
+ long numConsumingSegmentsQueried = 0L;
+ long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
long numTotalRawDocs = 0L;
boolean numGroupsLimitReached = false;
@@ -136,6 +138,16 @@ public class BrokerReduceService implements
ReduceService<BrokerResponseNative>
numSegmentsMatched += Long.parseLong(numSegmentsMatchedString);
}
+ String numConsumingString =
metadata.get(DataTable.NUM_CONSUMING_SEGMENTS_QUERIED);
+ if (numConsumingString != null) {
+ numConsumingSegmentsQueried += Long.parseLong(numConsumingString);
+ }
+
+ String minConsumingIndexTsString =
metadata.get(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS);
+ if (minConsumingIndexTsString != null) {
+ minConsumingFreshnessTimeMs =
Math.min(Long.parseLong(minConsumingIndexTsString),
minConsumingFreshnessTimeMs);
+ }
+
String numTotalRawDocsString =
metadata.get(DataTable.TOTAL_DOCS_METADATA_KEY);
if (numTotalRawDocsString != null) {
numTotalRawDocs += Long.parseLong(numTotalRawDocsString);
@@ -168,6 +180,13 @@ public class BrokerReduceService implements
ReduceService<BrokerResponseNative>
brokerResponseNative.setNumSegmentsMatched(numSegmentsMatched);
brokerResponseNative.setTotalDocs(numTotalRawDocs);
brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
+ if (numConsumingSegmentsQueried > 0) {
+ if (minConsumingFreshnessTimeMs == Long.MAX_VALUE) {
+ LOGGER.error("Invalid lastIndexedTimestamp across {} consuming
segments", numConsumingSegmentsQueried);
+ }
+
brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsQueried);
+
brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
+ }
// Update broker metrics.
String tableName = brokerRequest.getQuerySource().getTableName();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
index d14fe62..29a0cf8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamLevelConsumer.java
@@ -22,6 +22,7 @@ import com.yammer.metrics.core.Meter;
import kafka.consumer.ConsumerIterator;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.RowMetadata;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -84,14 +85,10 @@ public class KafkaStreamLevelConsumer implements
StreamLevelConsumer {
@Override
public GenericRow next(GenericRow destination) {
- return next(destination, null);
- }
- @Override
- public GenericRow next(GenericRow destination, StreamMessageMetadata
metadata) {
if (kafkaIterator.hasNext()) {
try {
- destination = _messageDecoder.decode(kafkaIterator.next().message(),
destination, metadata);
+ destination = _messageDecoder.decode(kafkaIterator.next().message(),
destination);
tableAndStreamRowsConsumed = _serverMetrics
.addMeteredTableValue(_tableAndStreamName,
ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
tableAndStreamRowsConsumed);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
index a575cff..09849ca 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/MessageBatch.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.realtime.stream;
import org.apache.pinot.annotations.InterfaceAudience;
import org.apache.pinot.annotations.InterfaceStability;
+import org.apache.pinot.common.metadata.RowMetadata;
/**
@@ -60,11 +61,9 @@ public interface MessageBatch<T> {
/**
* Returns the metadata associated with the message at a particular index.
This typically includes the timestamp
* when the message was ingested by the upstream stream-provider and other
relevant metadata.
- *
*/
- default void getMetadataAtIndex(int index, StreamMessageMetadata metadata) {
- // update metadata as required - default implementation does nothing
- return;
+ default RowMetadata getMetadataAtIndex(int index) {
+ return null;
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
index 6022f77..6b7285b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamLevelConsumer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.realtime.stream;
import org.apache.pinot.annotations.InterfaceAudience;
import org.apache.pinot.annotations.InterfaceStability;
+import org.apache.pinot.common.metadata.RowMetadata;
import org.apache.pinot.core.data.GenericRow;
@@ -45,16 +46,6 @@ public interface StreamLevelConsumer {
GenericRow next(GenericRow destination);
/**
- * Get next row from the stream and decode it into a generic row
- * @param destination the decoded generic row
- * @param metadata the metadata associated with the decoded stream if any
- * @return the decoded generic row
- */
- default GenericRow next(GenericRow destination, StreamMessageMetadata
metadata) {
- return next(destination);
- }
-
- /**
* Commit the offsets consumed so far
* The next call to consume should exclude all events consumed before the
commit was called, and start from newer events not yet consumed
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
index 9eef8d5..d95bb7f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageDecoder.java
@@ -58,40 +58,4 @@ public interface StreamMessageDecoder<T> {
* @return A new row decoded from the buffer
*/
GenericRow decode(T payload, int offset, int length, GenericRow destination);
-
- /**
- * Decodes the payload received into a generic row and optionally populates
any metadata associated
- * with the message.
- *
- * This is an interface default method is provided for backward
compatibility and should be
- * overridden by implementations that can provide meaningful metadata.
- *
- * @param payload the message payload
- * @param destination The {@link GenericRow} to write the decoded row into
- * @param metadata The {@link StreamMessageMetadata} associated with the
message
- * @return A new row decoded from the buffer
- */
- default GenericRow decode(T payload, GenericRow destination,
StreamMessageMetadata metadata) {
- // by default, invoke the non-metadata based method for backwards
compatibility
- return decode(payload, destination);
- }
-
- /**
- * Decodes the payload received into a generic row and optionally populates
any metadata associated
- * with the message.
- *
- * This is an interface default method is provided for backward
compatibility and should be
- * overridden by implementations that can provide meaningful metadata.
- *
- * @param payload the message payload
- * @param offset The offset into the array from which the row contents starts
- * @param length The length of the row contents in bytes
- * @param destination The {@link GenericRow} to write the decoded row into
- * @param metadata The {@link StreamMessageMetadata} associated with the
message
- * @return A new row decoded from the buffer
- */
- default GenericRow decode(T payload, int offset, int length, GenericRow
destination, StreamMessageMetadata metadata) {
- // by default, invoke the non-metadata based method for backwards
compatibility
- return decode(payload, offset, length, destination);
- }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
index 30c0859..0b3ad84 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamMessageMetadata.java
@@ -18,33 +18,24 @@
*/
package org.apache.pinot.core.realtime.stream;
-import org.apache.pinot.annotations.InterfaceAudience;
-import org.apache.pinot.annotations.InterfaceStability;
+import org.apache.pinot.common.metadata.RowMetadata;
/**
* A class that provides metadata associated with the message of a stream, for
e.g.,
* ingestion-timestamp of the message.
*/
[email protected]
[email protected]
-public class StreamMessageMetadata {
+public class StreamMessageMetadata implements RowMetadata {
- private long _ingestionTimestamp = Long.MIN_VALUE;
+ private final long _ingestionTimeMs;
- public StreamMessageMetadata() {
-
- }
-
- public long getIngestionTimestamp() {
- return _ingestionTimestamp;
+ public StreamMessageMetadata(long ingestionTimeMs) {
+ _ingestionTimeMs = ingestionTimeMs;
}
- public void setIngestionTimestamp(long timestamp) {
- _ingestionTimestamp = timestamp;
+ @Override
+ public long getIngestionTimeMs() {
+ return _ingestionTimeMs;
}
- public void reset() {
- _ingestionTimestamp = Long.MIN_VALUE;
- }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 527464a..ceed347 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -63,7 +63,7 @@ public class MutableSegmentImplAggregateMetricsTest {
}
Map<String, Long> expectedValues = new HashMap<>();
- StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
+ StreamMessageMetadata defaultMetadata = new
StreamMessageMetadata(System.currentTimeMillis());
Random random = new Random();
for (int i = 0; i < NUM_ROWS; i++) {
GenericRow row = new GenericRow();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 7e414c5..af1a0ab 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -54,6 +54,9 @@ public class MutableSegmentImplTest {
private Schema _schema;
private MutableSegmentImpl _mutableSegmentImpl;
private ImmutableSegment _immutableSegment;
+ private long _lastIndexedTs;
+ private long _lastIngestionTimeMs;
+ private long _startTimeMs;
@BeforeClass
public void setUp()
@@ -74,11 +77,14 @@ public class MutableSegmentImplTest {
_schema = config.getSchema();
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(),
Collections.emptySet(), false);
- StreamMessageMetadata defaultMetadata = new StreamMessageMetadata();
+ _lastIngestionTimeMs = System.currentTimeMillis();
+ StreamMessageMetadata defaultMetadata = new
StreamMessageMetadata(_lastIngestionTimeMs);
+ _startTimeMs = System.currentTimeMillis();
try (RecordReader recordReader = new AvroRecordReader(avroFile, _schema)) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
_mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
+ _lastIndexedTs = System.currentTimeMillis();
}
}
}
@@ -89,6 +95,13 @@ public class MutableSegmentImplTest {
SegmentMetadata expectedSegmentMetadata =
_immutableSegment.getSegmentMetadata();
Assert.assertEquals(actualSegmentMetadata.getTotalDocs(),
expectedSegmentMetadata.getTotalDocs());
+ // assert that the last indexed timestamp is close to what we expect
+ long actualTs =
_mutableSegmentImpl.getSegmentMetadata().getLastIndexedTimestamp();
+ Assert.assertTrue(actualTs >= _startTimeMs);
+ Assert.assertTrue(actualTs <= _lastIndexedTs);
+
+
Assert.assertEquals(_mutableSegmentImpl.getSegmentMetadata().getLatestIngestionTimestamp(),
_lastIngestionTimeMs);
+
for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
String column = fieldSpec.getName();
DataSourceMetadata actualDataSourceMetadata =
_mutableSegmentImpl.getDataSource(column).getDataSourceMetadata();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 87f57c54..14b0c2c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -47,6 +47,7 @@ public class LLCRealtimeClusterIntegrationTest extends
RealtimeClusterIntegratio
public final boolean _isDirectAlloc = RANDOM.nextBoolean();
public final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+ private final long _startTime = System.currentTimeMillis();
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
@@ -131,6 +132,9 @@ public class LLCRealtimeClusterIntegrationTest extends
RealtimeClusterIntegratio
JsonNode queryResponse =
postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
// Total docs should not change during reload
Assert.assertEquals(queryResponse.get("totalDocs").asLong(),
numTotalDocs);
+
Assert.assertEquals(queryResponse.get("numConsumingSegmentsQueried").asLong(),
2);
+
Assert.assertTrue(queryResponse.get("minConsumingFreshnessTimeMs").asLong() >
_startTime);
+
Assert.assertTrue(queryResponse.get("minConsumingFreshnessTimeMs").asLong() <
System.currentTimeMillis());
return queryResponse.get("numEntriesScannedInFilter").asLong() == 0;
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 64640a8..7ec4af2 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -129,15 +129,12 @@ public class MemoryEstimator {
// create mutable segment impl
MutableSegmentImpl mutableSegmentImpl = new
MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
- StreamMessageMetadata messageMetadata = new StreamMessageMetadata();
// read all rows and index them
try (PinotSegmentRecordReader segmentRecordReader = new
PinotSegmentRecordReader(_sampleCompletedSegment);) {
GenericRow row = new GenericRow();
while (segmentRecordReader.hasNext()) {
- messageMetadata.reset();
-
segmentRecordReader.next(row);
- mutableSegmentImpl.index(row, messageMetadata);
+ mutableSegmentImpl.index(row, null);
row.clear();
}
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]