This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bfdc74802c Simplify delay tracking in ingestion delay metric code
(#10101)
bfdc74802c is described below
commit bfdc74802ca7b2b3744fe33dd026ab09d56a79a8
Author: Juan Gomez <[email protected]>
AuthorDate: Wed Jan 11 18:09:57 2023 -0800
Simplify delay tracking in ingestion delay metric code (#10101)
---
.../manager/realtime/IngestionDelayTracker.java | 122 ++++++++++-----------
.../realtime/LLRealtimeSegmentDataManager.java | 8 +-
.../manager/realtime/RealtimeTableDataManager.java | 9 +-
.../realtime/IngestionDelayTrackerTest.java | 68 ++++++------
.../local/utils/tablestate/TableStateUtils.java | 2 +-
.../starter/helix/HelixInstanceDataManager.java | 6 +-
6 files changed, 102 insertions(+), 113 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 3031aee91b..43e72939e3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -22,6 +22,8 @@ package org.apache.pinot.core.data.manager.realtime;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -35,23 +37,20 @@ import org.slf4j.LoggerFactory;
/**
- * A Class to track realtime ingestion delay for a given table on a given
server.
+ * A Class to track realtime ingestion delay for table partitions on a given
server.
* Highlights:
* 1-An object of this class is hosted by each RealtimeTableDataManager.
* 2-The object tracks ingestion delays for all partitions hosted by the
current server for the given Realtime table.
* 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects
hosted in the corresponding
* RealtimeTableDataManager.
- * 4-A Metric is derived from reading the maximum tracked by this class. In
addition, individual metrics are associated
- * with each partition being tracked.
- * 5-Delays reported for partitions that do not have events to consume are
reported as zero.
- * 6-We track the time at which each delay sample was collected so that delay
can be increased when partition stops
- * consuming for any reason other than no events being available for
consumption.
- * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being
tracked so their delays do not cloud
+ * 4-Individual metrics are associated with each partition being tracked.
+ * 5-Delays for partitions that do not have events to consume are reported as
zero.
+ * 6-Partitions whose Segments go from CONSUMING to DROPPED state stop being
tracked so their delays do not cloud
* delays of active partitions.
- * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the
corresponding partition.
+ * 7-When a segment goes from CONSUMING to ONLINE, we start a timeout for the
corresponding partition.
* If no consumption is noticed after the timeout, we then read ideal state
to confirm the server still hosts the
* partition. If not, we stop tracking the respective partition.
- * 9-A timer thread is started by this object to track timeouts of partitions
and drive the reading of their ideal
+ * 8-A timer thread is started by this object to track timeouts of partitions
and drive the reading of their ideal
* state.
*
* The following diagram illustrates the object interactions with main
external APIs
@@ -85,26 +84,12 @@ public class IngestionDelayTracker {
private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
private static final Logger _logger =
LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
- /*
- * Class to keep an ingestion delay measure and the time when the sample was
taken (i.e. sample time)
- * We will use the sample time to increase ingestion delay when a partition
stops consuming: the time
- * difference between the sample time and current time will be added to the
metric when read.
- */
- static private class DelayMeasure {
- public DelayMeasure(long t, long d) {
- _delayMs = d;
- _sampleTime = t;
- }
- public final long _delayMs;
- public final long _sampleTime;
- }
-
- // HashMap used to store delay measures for all partitions active for the
current table.
- private final ConcurrentHashMap<Integer, DelayMeasure>
_partitionToDelaySampleMap = new ConcurrentHashMap<>();
+ // HashMap used to store ingestion time measures for all partitions active
for the current table.
+ private final Map<Integer, Long> _partitionToIngestionTimeMsMap = new
ConcurrentHashMap<>();
// We mark partitions that go from CONSUMING to ONLINE in
_partitionsMarkedForVerification: if they do not
- // go back to CONSUMING in some period of time, we confirm whether they are
still hosted in this server by reading
+ // go back to CONSUMING in some period of time, we verify whether they are
still hosted in this server by reading
// ideal state. This is done with the goal of minimizing reading ideal state
for efficiency reasons.
- private final ConcurrentHashMap<Integer, Long>
_partitionsMarkedForVerification = new ConcurrentHashMap<>();
+ private final Map<Integer, Long> _partitionsMarkedForVerification = new
ConcurrentHashMap<>();
final int _timerThreadTickIntervalMs;
// Timer task to check partitions that are inactive against ideal state.
@@ -137,11 +122,11 @@ public class IngestionDelayTracker {
_timerThreadTickIntervalMs = timerThreadTickIntervalMs;
_timer = new Timer("IngestionDelayTimerThread-" +
TableNameBuilder.extractRawTableName(tableNameWithType));
_timer.schedule(new TimerTask() {
- @Override
- public void run() {
- timeoutInactivePartitions();
- }
- }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
+ @Override
+ public void run() {
+ timeoutInactivePartitions();
+ }
+ }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
}
public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
@@ -151,20 +136,20 @@ public class IngestionDelayTracker {
}
/*
- * Helper function to age a delay measure. Aging means adding the time
elapsed since the measure was
- * taken till the measure is being reported.
- *
- * @param currentDelay original sample delay to which we will add the age of
the measure.
+ * Helper function to get the ingestion delay for a given ingestion time.
+ * Ingestion delay == Current Time - Ingestion Time
+ *
+ * @param ingestionTimeMs original ingestion time in milliseconds.
*/
- private long getAgedDelay(DelayMeasure currentDelay) {
- if (currentDelay == null) {
+ private long getIngestionDelayMs(Long ingestionTimeMs) {
+ if (ingestionTimeMs == null) {
return 0; // return 0 when not initialized
}
- // Add age of measure to the reported value
- long measureAgeInMs = _clock.millis() - currentDelay._sampleTime;
+ // Compute aged delay for current partition
+ long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
// Correct to zero for any time shifts due to NTP or time reset.
- measureAgeInMs = Math.max(measureAgeInMs, 0);
- return currentDelay._delayMs + measureAgeInMs;
+ agedIngestionDelayMs = Math.max(agedIngestionDelayMs, 0);
+ return agedIngestionDelayMs;
}
/*
@@ -174,7 +159,7 @@ public class IngestionDelayTracker {
* @param partitionGroupId partition ID which we should stop tracking.
*/
private void removePartitionId(int partitionGroupId) {
- _partitionToDelaySampleMap.remove(partitionGroupId);
+ _partitionToIngestionTimeMsMap.remove(partitionGroupId);
// If we are removing a partition we should stop reading its ideal state.
_partitionsMarkedForVerification.remove(partitionGroupId);
_serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
@@ -184,9 +169,9 @@ public class IngestionDelayTracker {
* Helper functions that creates a list of all the partitions that are
marked for verification and whose
* timeouts are expired. This helps us optimize checks of the ideal state.
*/
- private ArrayList<Integer> getPartitionsToBeVerified() {
- ArrayList<Integer> partitionsToVerify = new ArrayList<>();
- for (ConcurrentHashMap.Entry<Integer, Long> entry :
_partitionsMarkedForVerification.entrySet()) {
+ private List<Integer> getPartitionsToBeVerified() {
+ List<Integer> partitionsToVerify = new ArrayList<>();
+ for (Map.Entry<Integer, Long> entry :
_partitionsMarkedForVerification.entrySet()) {
long timeMarked = _clock.millis() - entry.getValue();
if (timeMarked > PARTITION_TIMEOUT_MS) {
// Partition must be verified
@@ -208,25 +193,24 @@ public class IngestionDelayTracker {
}
/*
- * Called by LLRealTimeSegmentDataManagers to post delay updates to this
tracker class.
+ * Called by LLRealTimeSegmentDataManagers to post ingestion time updates to
this tracker class.
*
- * @param delayMs ingestion delay being recorded.
- * @param sampleTime sample time.
- * @param partitionGroupId partition ID for which this delay is being
recorded.
+ * @param ingestionTimeMs ingestion time being recorded.
+ * @param partitionGroupId partition ID for which this ingestion time is
being recorded.
*/
- public void updateIngestionDelay(long delayMs, long sampleTime, int
partitionGroupId) {
+ public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId)
{
// Store new measure and wipe old one for this partition
// TODO: see if we can install gauges after the server is ready.
if (!_isServerReadyToServeQueries.get()) {
// Do not update the ingestion delay metrics during server startup period
return;
}
- DelayMeasure previousMeasure =
_partitionToDelaySampleMap.put(partitionGroupId,
- new DelayMeasure(sampleTime, delayMs));
+ Long previousMeasure = _partitionToIngestionTimeMsMap.put(partitionGroupId,
+ ingestionTimeMs);
if (previousMeasure == null) {
// First time we start tracking a partition we should start tracking it
via metric
_serverMetrics.addCallbackPartitionGaugeIfNeeded(_metricName,
partitionGroupId,
- ServerGauge.REALTIME_INGESTION_DELAY_MS, () ->
getPartitionIngestionDelay(partitionGroupId));
+ ServerGauge.REALTIME_INGESTION_DELAY_MS, () ->
getPartitionIngestionDelayMs(partitionGroupId));
}
// If we are consuming we do not need to track this partition for removal.
_partitionsMarkedForVerification.remove(partitionGroupId);
@@ -249,18 +233,23 @@ public class IngestionDelayTracker {
* This call is to be invoked by a timer thread that will periodically wake
up and invoke this function.
*/
public void timeoutInactivePartitions() {
- Set<Integer> partitionsHostedByThisServer = null;
+ if (!_isServerReadyToServeQueries.get()) {
+ // Do not update the tracker state during server startup period
+ return;
+ }
// Check if we have any partition to verify, else don't make the call to
check ideal state as that
// involves network traffic and may be inefficient.
- ArrayList<Integer> partitionsToVerify = getPartitionsToBeVerified();
+ List<Integer> partitionsToVerify = getPartitionsToBeVerified();
if (partitionsToVerify.size() == 0) {
// Don't make the call to getHostedPartitionsGroupIds() as it involves
checking ideal state.
return;
}
+ Set<Integer> partitionsHostedByThisServer;
try {
partitionsHostedByThisServer =
_realTimeTableDataManager.getHostedPartitionsGroupIds();
} catch (Exception e) {
- _logger.error("Failed to get partitions hosted by this server,
table={}", _tableNameWithType);
+ _logger.error("Failed to get partitions hosted by this server, table={},
exception={}:{}", _tableNameWithType,
+ e.getClass(), e.getMessage());
return;
}
for (int partitionGroupId : partitionsToVerify) {
@@ -278,6 +267,10 @@ public class IngestionDelayTracker {
* @param partitionGroupId Partition id that we need confirmed via ideal
state as still hosted by this server.
*/
public void markPartitionForVerification(int partitionGroupId) {
+ if (!_isServerReadyToServeQueries.get()) {
+ // Do not update the tracker state during server startup period
+ return;
+ }
_partitionsMarkedForVerification.put(partitionGroupId, _clock.millis());
}
@@ -288,9 +281,10 @@ public class IngestionDelayTracker {
*
* @return ingestion delay in milliseconds for the given partition ID.
*/
- public long getPartitionIngestionDelay(int partitionGroupId) {
- DelayMeasure currentMeasure =
_partitionToDelaySampleMap.get(partitionGroupId);
- return getAgedDelay(currentMeasure);
+ public long getPartitionIngestionDelayMs(int partitionGroupId) {
+ // Not protected as this will only be invoked when metric is installed
which happens after server ready
+ Long currentMeasure = _partitionToIngestionTimeMsMap.get(partitionGroupId);
+ return getIngestionDelayMs(currentMeasure);
}
/*
@@ -299,9 +293,13 @@ public class IngestionDelayTracker {
*/
public void shutdown() {
// Now that segments can't report metric, destroy metric for this table
- _timer.cancel();
+ _timer.cancel(); // Timer is installed in constructor so must always be
cancelled
+ if (!_isServerReadyToServeQueries.get()) {
+ // Do not update the tracker state during server startup period
+ return;
+ }
// Remove partitions so their related metrics get uninstalled.
- for (ConcurrentHashMap.Entry<Integer, DelayMeasure> entry :
_partitionToDelaySampleMap.entrySet()) {
+ for (Map.Entry<Integer, Long> entry :
_partitionToIngestionTimeMsMap.entrySet()) {
removePartitionId(entry.getKey());
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fe7c0f30b1..45570b59a4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -614,11 +614,11 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
indexedMessageCount, streamMessageCount, _currentOffset);
}
} else if (!prematureExit) {
+ // Record Pinot ingestion delay as zero since we are up-to-date and no
new events
+
_realtimeTableDataManager.updateIngestionDelay(System.currentTimeMillis(),
_partitionGroupId);
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms",
idlePipeSleepTimeMillis);
}
- // Record Pinot ingestion delay as zero since we are up-to-date and no
new events
- _realtimeTableDataManager.updateIngestionDelay(0,
System.currentTimeMillis(), _partitionGroupId);
// If there were no messages to be fetched from stream, wait for a
little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis,
TimeUnit.MILLISECONDS);
}
@@ -1569,10 +1569,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
*/
private void updateIngestionDelay(int indexedMessageCount) {
if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
- long ingestionDelayMs = _lastConsumedTimestampMs -
_lastRowMetadata.getRecordIngestionTimeMs();
- ingestionDelayMs = Math.max(ingestionDelayMs, 0);
// Record Ingestion delay for this partition
- _realtimeTableDataManager.updateIngestionDelay(ingestionDelayMs,
_lastConsumedTimestampMs,
+
_realtimeTableDataManager.updateIngestionDelay(_lastRowMetadata.getRecordIngestionTimeMs(),
_partitionGroupId);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 0527aac163..4ae54b6bbc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -237,12 +237,11 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
/*
* Method used by LLRealtimeSegmentManagers to update their partition delays
*
- * @param ingestionDelayMs Ingestion delay being reported.
- * @param currentTimeMs Timestamp of the measure being provided, i.e. when
this delay was computed.
+ * @param ingestionTimeMs Ingestion delay being reported.
* @param partitionGroupId Partition ID for which delay is being updated.
*/
- public void updateIngestionDelay(long ingestionDelayMs, long currenTimeMs,
int partitionGroupId) {
- _ingestionDelayTracker.updateIngestionDelay(ingestionDelayMs,
currenTimeMs, partitionGroupId);
+ public void updateIngestionDelay(long ingestionTimeMs, int partitionGroupId)
{
+ _ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs,
partitionGroupId);
}
/*
@@ -272,7 +271,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
/**
* Returns all partitionGroupIds for the partitions hosted by this server
for current table.
- * @Note: this involves Zookeeper read and should not be used frequently due
to efficiency concerns.
+ * @apiNote this involves Zookeeper read and should not be used frequently
due to efficiency concerns.
*/
public Set<Integer> getHostedPartitionsGroupIds() {
Set<Integer> partitionsHostedByThisServer = new HashSet<>();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index b91d714e11..e2d19fc0ac 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -39,7 +39,7 @@ public class IngestionDelayTrackerTest {
new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, () -> true);
// With no samples, the time reported must be zero
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0),
0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
return ingestionDelayTracker;
}
@@ -51,21 +51,22 @@ public class IngestionDelayTrackerTest {
IngestionDelayTracker ingestionDelayTracker =
new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, () -> true);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0),
0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
ingestionDelayTracker.shutdown();
// Test constructor with timer arguments
ingestionDelayTracker =
new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, () ->
true);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0),
0);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
// Test bad timer args to the constructor
try {
- ingestionDelayTracker =
- new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+ new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
realtimeTableDataManager, 0, () -> true);
- Assert.assertTrue(false); // Constructor must assert
+ Assert.fail("Must have asserted due to invalid arguments"); //
Constructor must assert
} catch (Exception e) {
- Assert.assertTrue(e instanceof RuntimeException);
+ if ((e instanceof NullPointerException) || !(e instanceof
RuntimeException)) {
+ Assert.fail(String.format("Unexpected exception: %s:%s", e.getClass(),
e.getMessage()));
+ }
}
}
@@ -84,29 +85,29 @@ public class IngestionDelayTrackerTest {
// Test we follow a single partition up and down
for (long i = 0; i <= maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - i);
}
// Test tracking down a measure for a given partition
for (long i = maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - i);
}
// Make the current partition maximum
- ingestionDelayTracker.updateIngestionDelay(maxTestDelay, 0, partition0);
+ ingestionDelayTracker.updateIngestionDelay(maxTestDelay, partition0);
// Bring up partition1 delay up and verify values
for (long i = 0; i <= 2 * maxTestDelay; i++) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition1);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - i);
}
// Bring down values of partition1 and verify values
for (long i = 2 * maxTestDelay; i >= 0; i--) {
- ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
i + clock.millis());
+ ingestionDelayTracker.updateIngestionDelay(i, partition1);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - i);
}
ingestionDelayTracker.shutdown();
@@ -121,12 +122,8 @@ public class IngestionDelayTrackerTest {
final long partition0Offset0Ms = 300;
final long partition0Offset1Ms = 1000;
final int partition1 = 1;
- final long partition1Delay0 = 11; // Record something slightly higher than
previous max
- final long partition1Delay1 = 8; // Record something lower so that
partition0 is the current max again
+ final long partition1Delay0 = 11;
final long partition1Offset0Ms = 150;
- final long partition1Offset1Ms = 450;
-
- final long sleepMs = 500;
IngestionDelayTracker ingestionDelayTracker = createTracker();
@@ -135,31 +132,28 @@ public class IngestionDelayTrackerTest {
ZoneId zoneId = ZoneId.systemDefault();
Clock clock = Clock.fixed(now, zoneId);
ingestionDelayTracker.setClock(clock);
- ingestionDelayTracker.updateIngestionDelay(partition0Delay0,
clock.millis(), partition0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
partition0Delay0);
+ ingestionDelayTracker.updateIngestionDelay(clock.millis() -
partition0Delay0, partition0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay0);
// Advance clock and test aging
Clock offsetClock = Clock.offset(clock,
Duration.ofMillis(partition0Offset0Ms));
ingestionDelayTracker.setClock(offsetClock);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay0 + partition0Offset0Ms));
- // Add a new value below max and verify we are tracking the new max
correctly
- ingestionDelayTracker.updateIngestionDelay(partition0Delay1,
offsetClock.millis(), partition0);
- Clock partition0LastUpdate = Clock.offset(offsetClock, Duration.ZERO); //
Save this as we need to verify aging later
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
partition0Delay1);
+ ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() -
partition0Delay1, partition0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay1);
// Add some offset to the last sample and make sure we age that measure
properly
offsetClock = Clock.offset(offsetClock,
Duration.ofMillis(partition0Offset1Ms));
ingestionDelayTracker.setClock(offsetClock);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay1 + partition0Offset1Ms));
- // Now try setting a new maximum in another partition
- ingestionDelayTracker.updateIngestionDelay(partition1Delay0,
offsetClock.millis(), partition1);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
partition1Delay0);
+ ingestionDelayTracker.updateIngestionDelay(offsetClock.millis() -
partition1Delay0, partition1);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
partition1Delay0);
// Add some offset to the last sample and make sure we age that measure
properly
offsetClock = Clock.offset(offsetClock,
Duration.ofMillis(partition1Offset0Ms));
ingestionDelayTracker.setClock(offsetClock);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
(partition1Delay0 + partition1Offset0Ms));
ingestionDelayTracker.shutdown();
@@ -179,16 +173,16 @@ public class IngestionDelayTrackerTest {
// Record a number of partitions with delay equal to partition id
for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
- ingestionDelayTracker.updateIngestionDelay(partitionGroupId,
clock.millis(), partitionGroupId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId),
partitionGroupId);
+ ingestionDelayTracker.updateIngestionDelay(clock.millis() -
partitionGroupId, partitionGroupId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
partitionGroupId);
}
// Verify that as we remove partitions the next available maximum takes
over
for (int partitionGroupId = maxPartition; partitionGroupId >= 0;
partitionGroupId--) {
- ingestionDelayTracker.stopTrackingPartitionIngestionDelay((int)
partitionGroupId);
+
ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId);
}
for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
// Untracked partitions must return 0
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId),
0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId),
0);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 67be761616..c11886f613 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -42,7 +42,7 @@ public class TableStateUtils {
* Returns all segments in a given state for a given table.
*
* @param helixManager instance of Helix manager
- * @param tableNameWithType table for which we are obtaining ONLINE segments
+ * @param tableNameWithType table for which we are obtaining segments in a
given state
* @param state state of the segments to be returned
*
* @return List of segment names in a given state.
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index a52d627aac..c4eb0d3e6b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -90,7 +90,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private ServerMetrics _serverMetrics;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private SegmentUploader _segmentUploader;
- private Supplier<Boolean> _isReadyToServeQueries = () -> true;
+ private Supplier<Boolean> _isServerReadyToServeQueries = () -> false;
// Fixed size LRU cache for storing last N errors on the instance.
// Key is TableNameWithType-SegmentName pair.
@@ -98,7 +98,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
@Override
public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean>
isServingQueries) {
- _isReadyToServeQueries = isServingQueries;
+ _isServerReadyToServeQueries = isServingQueries;
}
@Override
@@ -195,7 +195,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TableDataManagerConfig tableDataManagerConfig = new
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
TableDataManager tableDataManager =
TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
_instanceId, _propertyStore,
- _serverMetrics, _helixManager, _errorCache,
_isReadyToServeQueries);
+ _serverMetrics, _helixManager, _errorCache,
_isServerReadyToServeQueries);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]