This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9225ec0144 Add a tracker for end-to-end consumption delay of events.
(#10121)
9225ec0144 is described below
commit 9225ec01449ee1b894ffaaa01e7e2e3d9e914cb1
Author: Juan Gomez <[email protected]>
AuthorDate: Mon Jan 23 15:33:04 2023 -0800
Add a tracker for end-to-end consumption delay of events. (#10121)
---
.../apache/pinot/common/metrics/ServerGauge.java | 5 +-
.../manager/realtime/IngestionDelayTracker.java | 59 ++++++++++++++----
.../realtime/LLRealtimeSegmentDataManager.java | 4 +-
.../manager/realtime/RealtimeTableDataManager.java | 4 +-
.../realtime/IngestionDelayTrackerTest.java | 69 +++++++++++++++++-----
.../org/apache/pinot/spi/stream/RowMetadata.java | 18 +++++-
.../pinot/spi/stream/StreamMessageMetadata.java | 21 +++++--
7 files changed, 141 insertions(+), 39 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 7dd52e90ea..f3d0fa95eb 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -46,8 +46,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
JVM_HEAP_USED_BYTES("bytes", true),
- // Ingestion delay metric
- REALTIME_INGESTION_DELAY_MS("milliseconds", false);
+ // Ingestion delay metrics
+ REALTIME_INGESTION_DELAY_MS("milliseconds", false),
+ END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false);
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 6452866195..6e11297fd0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -76,6 +76,15 @@ import org.slf4j.LoggerFactory;
public class IngestionDelayTracker {
+ // Class to wrap supported timestamps collected for an ingested event
+ private static class IngestionTimestamps {
+ IngestionTimestamps(long ingestionTimesMs, long
firstStreamIngestionTimeMs) {
+ _ingestionTimeMs = ingestionTimesMs;
+ _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
+ }
+ private final long _ingestionTimeMs;
+ private final long _firstStreamIngestionTimeMs;
+ }
// Sleep interval for timer thread that triggers read of ideal state
private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5
minutes +/- precision in timeouts
// Once a partition is marked for verification, we wait 10 minutes to pull
its ideal state.
@@ -85,7 +94,7 @@ public class IngestionDelayTracker {
private static final Logger _logger =
LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
// HashMap used to store ingestion time measures for all partitions active
for the current table.
- private final Map<Integer, Long> _partitionToIngestionTimeMsMap = new
ConcurrentHashMap<>();
+ private final Map<Integer, IngestionTimestamps>
_partitionToIngestionTimestampsMap = new ConcurrentHashMap<>();
// We mark partitions that go from CONSUMING to ONLINE in
_partitionsMarkedForVerification: if they do not
// go back to CONSUMING in some period of time, we verify whether they are
still hosted in this server by reading
// ideal state. This is done with the goal of minimizing reading ideal state
for efficiency reasons.
@@ -141,9 +150,9 @@ public class IngestionDelayTracker {
*
* @param ingestionTimeMs original ingestion time in milliseconds.
*/
- private long getIngestionDelayMs(Long ingestionTimeMs) {
- if (ingestionTimeMs == null) {
- return 0; // return 0 when not initialized
+ private long getIngestionDelayMs(long ingestionTimeMs) {
+ if (ingestionTimeMs < 0) {
+ return 0;
}
// Compute aged delay for current partition
long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
@@ -159,7 +168,7 @@ public class IngestionDelayTracker {
* @param partitionGroupId partition ID which we should stop tracking.
*/
private void removePartitionId(int partitionGroupId) {
- _partitionToIngestionTimeMsMap.remove(partitionGroupId);
+ _partitionToIngestionTimestampsMap.remove(partitionGroupId);
// If we are removing a partition we should stop reading its ideal state.
_partitionsMarkedForVerification.remove(partitionGroupId);
_serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
@@ -196,21 +205,28 @@ public class IngestionDelayTracker {
* Called by LLRealTimeSegmentDataManagers to post ingestion time updates to
this tracker class.
*
* @param ingestionTimeMs ingestion time being recorded.
+ * @param firstStreamIngestionTimeMs time the event was ingested in the
first stage of the ingestion pipeline.
* @param partitionGroupId partition ID for which this ingestion time is
being recorded.
*/
- public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId)
{
+ public void updateIngestionDelay(long ingestionTimeMs, long
firstStreamIngestionTimeMs, int partitionGroupId) {
// Store new measure and wipe old one for this partition
- // TODO: see if we can install gauges after the server is ready.
if (!_isServerReadyToServeQueries.get()) {
// Do not update the ingestion delay metrics during server startup period
return;
}
- Long previousMeasure = _partitionToIngestionTimeMsMap.put(partitionGroupId,
- ingestionTimeMs);
+ IngestionTimestamps previousMeasure =
_partitionToIngestionTimestampsMap.put(partitionGroupId,
+ new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs));
if (previousMeasure == null) {
// First time we start tracking a partition we should start tracking it
via metric
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
ServerGauge.REALTIME_INGESTION_DELAY_MS, () ->
getPartitionIngestionDelayMs(partitionGroupId));
+ if (firstStreamIngestionTimeMs >= 0) {
+ // Only publish this metric when creation time is supported by the
underlying stream
+ // When this timestamp is not supported it always returns the value
Long.MIN_VALUE
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId,
+ ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
+ () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId));
+ }
}
// If we are consuming we do not need to track this partition for removal.
_partitionsMarkedForVerification.remove(partitionGroupId);
@@ -283,8 +299,27 @@ public class IngestionDelayTracker {
*/
public long getPartitionIngestionDelayMs(int partitionGroupId) {
// Not protected as this will only be invoked when metric is installed
which happens after server ready
- Long currentMeasure = _partitionToIngestionTimeMsMap.get(partitionGroupId);
- return getIngestionDelayMs(currentMeasure);
+ IngestionTimestamps currentMeasure =
_partitionToIngestionTimestampsMap.get(partitionGroupId);
+ if (currentMeasure == null) { // Guard just in case we read the metric
without initializing it
+ return 0;
+ }
+ return getIngestionDelayMs(currentMeasure._ingestionTimeMs);
+ }
+
+ /*
+ * Method to get end to end ingestion delay for a given partition.
+ *
+ * @param partitionGroupId partition for which we are retrieving the delay
+ *
+ * @return End to end ingestion delay in milliseconds for the given
partition ID.
+ */
+ public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) {
+ // Not protected as this will only be invoked when metric is installed
which happens after server ready
+ IngestionTimestamps currentMeasure =
_partitionToIngestionTimestampsMap.get(partitionGroupId);
+ if (currentMeasure == null) { // Guard just in case we read the metric
without initializing it
+ return 0;
+ }
+ return getIngestionDelayMs(currentMeasure._firstStreamIngestionTimeMs);
}
/*
@@ -299,7 +334,7 @@ public class IngestionDelayTracker {
return;
}
// Remove partitions so their related metrics get uninstalled.
- for (Map.Entry<Integer, Long> entry :
_partitionToIngestionTimeMsMap.entrySet()) {
+ for (Map.Entry<Integer, IngestionTimestamps> entry :
_partitionToIngestionTimestampsMap.entrySet()) {
removePartitionId(entry.getKey());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 45570b59a4..aa83cda832 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -615,7 +615,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
} else if (!prematureExit) {
// Record Pinot ingestion delay as zero since we are up-to-date and no
new events
-
_realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(),
_partitionGroupId);
+ long currentTimeMs = System.currentTimeMillis();
+ _realtimeTableDataManager.updateIngestionDelay(currentTimeMs,
currentTimeMs, _partitionGroupId);
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms",
idlePipeSleepTimeMillis);
}
@@ -1571,6 +1572,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
// Record Ingestion delay for this partition
_realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
+ _lastRowMetadata.getFirstStreamRecordIngestionTimeMs(),
_partitionGroupId);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 4ae54b6bbc..5f5a47207c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -240,8 +240,8 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
* @param ingestionTimeMs Ingestion delay being reported.
* @param partitionGroupId Partition ID for which delay is being updated.
*/
- public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId)
{
- _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
partitionGroupId);
+ public void updateIngestionDelay(long ingestionTimeMs, long
firstStreamIngestionTimeMs, int partitionGroupId) {
+ _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
firstStreamIngestionTimeMs, partitionGroupId);
}
/*
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index e2d19fc0ac..177fe33269 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -85,29 +85,37 @@ public class IngestionDelayTrackerTest {
// Test we follow a single partition up and down
for (long i = 0; i <= maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, partition0);
+ ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - i);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+ clock.millis() - (i + 1));
}
// Test tracking down a measure for a given partition
for (long i = maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, partition0);
+ ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - i);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+ clock.millis() - (i + 1));
}
// Make the current partition maximum
- ingestionDelayTracker.updateIngestionDelay(maxTestDelay, partition0);
+ ingestionDelayTracker.updateIngestionDelay(maxTestDelay, maxTestDelay,
partition0);
// Bring up partition1 delay up and verify values
for (long i = 0; i <= 2 * maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, partition1);
+ ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - i);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
+ clock.millis() - (i + 1));
}
// Bring down values of partition1 and verify values
for (long i = 2 * maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, partition1);
+ ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - i);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
+ clock.millis() - (i + 1));
}
ingestionDelayTracker.shutdown();
@@ -132,24 +140,34 @@ public class IngestionDelayTrackerTest {
ZoneId zoneId = ZoneId.systemDefault();
Clock clock = Clock.fixed(now, zoneId);
ingestionDelayTracker.setClock(clock);
- ingestionDelayTracker.updateIngestionDelay(clock.millis() -
partition0Delay0, partition0);
+ ingestionDelayTracker.updateIngestionDelay((clock.millis() -
partition0Delay0),
+ (clock.millis() - partition0Delay0), partition0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
partition0Delay0);
// Advance clock and test aging
Clock offsetClock = Clock.offset(clock,
Duration.ofMillis(partition0Offset0Ms));
ingestionDelayTracker.setClock(offsetClock);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay0 + partition0Offset0Ms));
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
+ (partition0Delay0 + partition0Offset0Ms));
- ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() -
partition0Delay1, partition0);
+ ingestionDelayTracker.updateIngestionDelay((offsetClock.millis() -
partition0Delay1),
+ (offsetClock.millis() - partition0Delay1), partition0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay1);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
partition0Delay1);
+
// Add some offset to the last sample and make sure we age that measure
properly
offsetClock = Clock.offset(offsetClock,
Duration.ofMillis(partition0Offset1Ms));
ingestionDelayTracker.setClock(offsetClock);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay1 + partition0Offset1Ms));
- ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() -
partition1Delay0, partition1);
+ ingestionDelayTracker.updateIngestionDelay((offsetClock.millis() -
partition1Delay0),
+ (offsetClock.millis() - partition1Delay0), partition1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
partition1Delay0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
partition1Delay0);
+
// Add some offset to the last sample and make sure we age that measure
properly
offsetClock = Clock.offset(offsetClock,
Duration.ofMillis(partition1Offset0Ms));
ingestionDelayTracker.setClock(offsetClock);
@@ -173,26 +191,45 @@ public class IngestionDelayTrackerTest {
// Record a number of partitions with delay equal to partition id
for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
- ingestionDelayTracker.updateIngestionDelay(clock.millis() -
partitionGroupId, partitionGroupId);
+ ingestionDelayTracker.updateIngestionDelay((clock.millis() -
partitionGroupId),
+ (clock.millis() - partitionGroupId), partitionGroupId);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
partitionGroupId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
+ partitionGroupId);
}
- // Verify that as we remove partitions the next available maximum takes
over
for (int partitionGroupId = maxPartition; partitionGroupId >= 0;
partitionGroupId--) {
ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId);
}
for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
// Untracked partitions must return 0
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
0);
}
}
@Test
- public void testTickInactivePartitions() {
- Assert.assertTrue(true);
- }
+ public void testShutdown() {
+ final long maxTestDelay = 100;
- @Test
- public void testMarkPartitionForConfirmation() {
- Assert.assertTrue(true);
+ IngestionDelayTracker ingestionDelayTracker = createTracker();
+ // Use fixed clock so samples don't age
+ Instant now = Instant.now();
+ ZoneId zoneId = ZoneId.systemDefault();
+ Clock clock = Clock.fixed(now, zoneId);
+ ingestionDelayTracker.setClock(clock);
+
+ // Test Shutdown with partitions active
+ for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
+ ingestionDelayTracker.updateIngestionDelay((clock.millis() -
partitionGroupId),
+ (clock.millis() - partitionGroupId), partitionGroupId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
partitionGroupId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId),
+ partitionGroupId);
+ }
+ ingestionDelayTracker.shutdown();
+
+ // Test shutdown with no partitions
+ ingestionDelayTracker = createTracker();
+ ingestionDelayTracker.shutdown();
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
index 4c4f17792e..8a5eac3981 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java
@@ -39,8 +39,8 @@ public interface RowMetadata {
/**
* Returns the timestamp associated with the record. This typically refers
to the time it was ingested into the
- * upstream source. In some cases, it may be the time at which the record
was created, aka event time (eg. in kafka,
- * a topic may be configured to use record `CreateTime` instead of
`LogAppendTime`).
+ * (last) upstream source. In some cases, it may be the time at which the
record was created, aka event time
+ * (eg. in kafka, a topic may be configured to use record `CreateTime`
instead of `LogAppendTime`).
*
* Expected to be used for stream-based sources.
*
@@ -49,6 +49,20 @@ public interface RowMetadata {
*/
long getRecordIngestionTimeMs();
+ /**
+ * When supported by the underlying stream, this method returns the
timestamp in milliseconds associated with
+ * the ingestion of the record in the first stream.
+ *
+ * Complex ingestion pipelines may be composed of multiple streams:
+ * (EventCreation) -> {First Stream} -> ... -> {Last Stream}
+ *
+ * @return timestamp (epoch in milliseconds) when the row was initially
ingested upstream for the first
+ * time Long.MIN_VALUE if not supported by the underlying stream.
+ */
+ default long getFirstStreamRecordIngestionTimeMs() {
+ return Long.MIN_VALUE;
+ }
+
/**
* Returns the stream message headers
*
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
index 2b0860690a..ac67249441 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java
@@ -30,23 +30,31 @@ import org.apache.pinot.spi.data.readers.GenericRow;
*/
public class StreamMessageMetadata implements RowMetadata {
private final long _recordIngestionTimeMs;
+ private final long _firstStreamRecordIngestionTimeMs;
private final GenericRow _headers;
private final Map<String, String> _metadata;
public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable
GenericRow headers) {
- this(recordIngestionTimeMs, headers, Collections.emptyMap());
+ this(recordIngestionTimeMs, Long.MIN_VALUE, headers,
Collections.emptyMap());
}
+ public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable
GenericRow headers,
+ Map<String, String> metadata) {
+ this(recordIngestionTimeMs, Long.MIN_VALUE, headers, metadata);
+ }
/**
* Construct the stream based message/row message metadata
*
- * @param recordIngestionTimeMs the time that the message was ingested by
the stream provider
+ * @param recordIngestionTimeMs the time that the message was ingested by
the stream provider.
* use Long.MIN_VALUE if not applicable
+ * @param firstStreamRecordIngestionTimeMs the time that the message was
ingested by the first stream provider
+ * in the ingestion pipeline. use Long.MIN_VALUE if
not applicable
* @param metadata
*/
- public StreamMessageMetadata(long recordIngestionTimeMs, @Nullable
GenericRow headers,
- Map<String, String> metadata) {
+ public StreamMessageMetadata(long recordIngestionTimeMs, long
firstStreamRecordIngestionTimeMs,
+ @Nullable GenericRow headers, Map<String, String> metadata) {
_recordIngestionTimeMs = recordIngestionTimeMs;
+ _firstStreamRecordIngestionTimeMs = firstStreamRecordIngestionTimeMs;
_headers = headers;
_metadata = metadata;
}
@@ -56,6 +64,11 @@ public class StreamMessageMetadata implements RowMetadata {
return _recordIngestionTimeMs;
}
+ @Override
+ public long getFirstStreamRecordIngestionTimeMs() {
+ return _firstStreamRecordIngestionTimeMs;
+ }
+
@Override
public GenericRow getHeaders() {
return _headers;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]