This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new afd5530230 allow passing freshness checker after an idle threshold
(#11345)
afd5530230 is described below
commit afd5530230cb73592a65ab8990148fc405d77a9e
Author: Johan Adami <[email protected]>
AuthorDate: Tue Aug 22 02:41:20 2023 -0400
allow passing freshness checker after an idle threshold (#11345)
---
.../core/data/manager/realtime/IdleTimer.java | 71 +++++++++++
.../realtime/LLRealtimeSegmentDataManager.java | 53 ++++----
.../core/data/manager/realtime/IdleTimerTest.java | 123 ++++++++++++++++++
.../server/starter/helix/BaseServerStarter.java | 8 +-
.../FreshnessBasedConsumptionStatusChecker.java | 26 +++-
...FreshnessBasedConsumptionStatusCheckerTest.java | 137 +++++++++++++++++++--
.../apache/pinot/spi/utils/CommonConstants.java | 12 ++
7 files changed, 394 insertions(+), 36 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IdleTimer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IdleTimer.java
new file mode 100644
index 0000000000..bb44bdaae1
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IdleTimer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+/**
+ * The IdleTimer is responsible for keeping track of 2 different idle times:
+ * - The stream idle time which resets every time we remake the stream
consumer.
+ * This depends on the user configured "idle.timeout.millis" stream config.
+ * - the total idle time which only resets when we consume something.
+ */
+public class IdleTimer {
+
+ private volatile long _timeWhenStreamLastCreatedOrConsumedMs = 0;
+ private volatile long _timeWhenEventLastConsumedMs = 0;
+
+ public IdleTimer() {
+ }
+
+ protected long now() {
+ return System.currentTimeMillis();
+ }
+
+ public void init() {
+ long nowMs = now();
+ // When an event is consumed, we consider the stream no longer idle.
+ // Event consumption idleness, should always be greater than stream
+ // idleness since we recreate the stream after some amount of idleness,
+ // but that does not guarantee we'll consume an event.
+ _timeWhenStreamLastCreatedOrConsumedMs = nowMs;
+ _timeWhenEventLastConsumedMs = nowMs;
+ }
+
+ public void markStreamCreated() {
+ _timeWhenStreamLastCreatedOrConsumedMs = now();
+ }
+
+ public void markEventConsumed() {
+ init();
+ }
+
+ public long getTimeSinceStreamLastCreatedOrConsumedMs() {
+ if (_timeWhenStreamLastCreatedOrConsumedMs == 0) {
+ return 0;
+ }
+ return now() - _timeWhenStreamLastCreatedOrConsumedMs;
+ }
+
+ public long getTimeSinceEventLastConsumedMs() {
+ if (_timeWhenEventLastConsumedMs == 0) {
+ return 0;
+ }
+ return now() - _timeWhenEventLastConsumedMs;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index b2c9bdcfaf..4acab57731 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -242,6 +242,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
private volatile int _numRowsErrored = 0;
private volatile int _consecutiveErrorCount = 0;
private long _startTimeMs = 0;
+ private final IdleTimer _idleTimer = new IdleTimer();
private final String _segmentNameStr;
private final SegmentVersion _segmentVersion;
private final SegmentBuildTimeLeaseExtender _leaseExtender;
@@ -404,8 +405,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_numRowsErrored = 0;
final long idlePipeSleepTimeMillis = 100;
final long idleTimeoutMillis =
_partitionLevelStreamConfig.getIdleTimeoutMillis();
- long idleStartTimeMillis = -1;
- boolean idle = false;
+ _idleTimer.init();
StreamPartitionMsgOffset lastUpdatedOffset =
_streamPartitionMsgOffsetFactory
.create(_currentOffset); // so that we always update the metric when
we enter this method.
@@ -443,7 +443,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
boolean endCriteriaReached = processStreamEvents(messageBatch,
idlePipeSleepTimeMillis);
if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
- idle = false;
+ _idleTimer.markEventConsumed();
// We consumed something. Update the highest stream offset as well as
partition-consuming metric.
// TODO Issue 5359 Need to find a way to bump metrics without getting
actual offset value.
if (_currentOffset instanceof LongMsgOffset) {
@@ -462,7 +462,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
// We check this flag again further down
} else if (messageBatch.getUnfilteredMessageCount() > 0) {
- idle = false;
+ _idleTimer.markEventConsumed();
// we consumed something from the stream but filtered all the content
out,
// so we need to advance the offsets to avoid getting stuck
StreamPartitionMsgOffset nextOffset =
messageBatch.getOffsetOfNextBatch();
@@ -473,21 +473,16 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(nextOffset);
} else {
// We did not consume any rows.
- if (!idle) {
- idleStartTimeMillis = now();
- idle = true;
- }
- if (idleTimeoutMillis >= 0) {
- long totalIdleTimeMillis = now() - idleStartTimeMillis;
- if (totalIdleTimeMillis > idleTimeoutMillis) {
- // Update the partition-consuming metric only if we have been
idling beyond idle timeout.
- // Create a new stream consumer wrapper, in case we are stuck on
something.
- _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
- recreateStreamConsumer(
- String.format("Total idle time: %d ms exceeded idle timeout:
%d ms", totalIdleTimeMillis,
- idleTimeoutMillis));
- idle = false;
- }
+ long timeSinceStreamLastCreatedOrConsumedMs =
_idleTimer.getTimeSinceStreamLastCreatedOrConsumedMs();
+
+ if (idleTimeoutMillis >= 0 && (timeSinceStreamLastCreatedOrConsumedMs
> idleTimeoutMillis)) {
+ // Update the partition-consuming metric only if we have been idling
beyond idle timeout.
+ // Create a new stream consumer wrapper, in case we are stuck on
something.
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
+ recreateStreamConsumer(
+ String.format("Total idle time: %d ms exceeded idle timeout: %d
ms",
+ timeSinceStreamLastCreatedOrConsumedMs, idleTimeoutMillis));
+ _idleTimer.markStreamCreated();
}
}
@@ -1507,17 +1502,29 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
}
+ public long getTimeSinceEventLastConsumedMs() {
+ return _idleTimer.getTimeSinceEventLastConsumedMs();
+ }
+
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {
+ return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
maxWaitTimeMs);
+ }
+
+ public StreamPartitionMsgOffset fetchEarliestStreamOffset(long
maxWaitTimeMs) {
+ return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
maxWaitTimeMs);
+ }
+
+ private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria
offsetCriteria, long maxWaitTimeMs) {
if (_partitionMetadataProvider == null) {
createPartitionMetadataProvider("Fetch latest stream offset");
}
try {
- return
_partitionMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
- maxWaitTimeMs);
+ return
_partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria,
maxWaitTimeMs);
} catch (Exception e) {
_segmentLogger.warn(
- "Cannot fetch latest stream offset for clientId {} and
partitionGroupId {} with maxWaitTime {}", _clientId,
- _partitionGroupId, maxWaitTimeMs);
+ String.format(
+ "Cannot fetch stream offset with criteria %s for clientId %s and
partitionGroupId %d with maxWaitTime %d",
+ offsetCriteria, _clientId, _partitionGroupId, maxWaitTimeMs), e);
}
return null;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IdleTimerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IdleTimerTest.java
new file mode 100644
index 0000000000..e0adf5bfb1
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IdleTimerTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class IdleTimerTest {
+
+ private static class StaticIdleTimer extends IdleTimer {
+
+ private long _nowTimeMs = 0;
+
+ public StaticIdleTimer() {
+ super();
+ }
+
+ @Override
+ protected long now() {
+ return _nowTimeMs;
+ }
+
+ public void setNowTimeMs(long nowTimeMs) {
+ _nowTimeMs = nowTimeMs;
+ }
+ }
+
+ @Test
+ public void testIdleTimerResetNoIdle() {
+ StaticIdleTimer timer = new StaticIdleTimer();
+ // idle times are all 0 before init
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ // start times are all 1000L
+ timer.setNowTimeMs(1000L);
+ timer.init();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ // new now time should affect idle time
+ timer.setNowTimeMs(2000L);
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(),
1000);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+ // everything resets to 2000
+ timer.init();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ }
+
+ @Test
+ public void testOnlyResetStreamIdleTime() {
+ StaticIdleTimer timer = new StaticIdleTimer();
+ timer.setNowTimeMs(1000L);
+ timer.init();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ // only stream idle time resets
+ timer.setNowTimeMs(2000L);
+ timer.markStreamCreated();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+ // everything resets to 0
+ timer.init();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ }
+
+ @Test
+ public void testMultipleIdleResets() {
+ StaticIdleTimer timer = new StaticIdleTimer();
+ timer.setNowTimeMs(1000L);
+ timer.init();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ // new now time should affect idle time
+ timer.setNowTimeMs(2000L);
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(),
1000);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+ // only stream idle time resets
+ timer.markStreamCreated();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+ // everything resets to 0
+ timer.setNowTimeMs(3000L);
+ timer.markEventConsumed();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ // later now times should affect idle time
+ timer.setNowTimeMs(4000L);
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(),
1000);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 1000);
+ // only stream idle time resets
+ timer.setNowTimeMs(5000L);
+ timer.markStreamCreated();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 2000);
+ // later now time should only increase both idle times
+ timer.setNowTimeMs(6000L);
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(),
1000);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 3000);
+ // everything resets to 0
+ timer.init();
+ Assert.assertEquals(timer.getTimeSinceStreamLastCreatedOrConsumedMs(), 0);
+ Assert.assertEquals(timer.getTimeSinceEventLastConsumedMs(), 0);
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index d758ff627a..4704441372 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -313,10 +313,14 @@ public abstract class BaseServerStarter implements
ServiceStartable {
// are accidentally enabled together. The freshness based checker is a
stricter version of the offset based
// checker. But in the end, both checkers are bounded in time by
realtimeConsumptionCatchupWaitMs.
if (isFreshnessStatusCheckerEnabled) {
- LOGGER.info("Setting up freshness based status checker");
+ int idleTimeoutMs =
_serverConf.getProperty(Server.CONFIG_OF_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS,
+ Server.DEFAULT_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS);
+
+ LOGGER.info("Setting up freshness based status checker with min
freshness {} and idle timeout {}",
+ realtimeMinFreshnessMs, idleTimeoutMs);
FreshnessBasedConsumptionStatusChecker freshnessStatusChecker =
new
FreshnessBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(),
consumingSegments,
- realtimeMinFreshnessMs);
+ realtimeMinFreshnessMs, idleTimeoutMs);
Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness =
freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
serviceStatusCallbackListBuilder.add(
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
index 3cf3dd3587..abce5d5aaa 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.java
@@ -35,11 +35,13 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
*/
public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsumptionStatusChecker {
private final long _minFreshnessMs;
+ private final long _idleTimeoutMs;
public FreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager, Set<String> consumingSegments,
- long minFreshnessMs) {
+ long minFreshnessMs, long idleTimeoutMs) {
super(instanceDataManager, consumingSegments);
_minFreshnessMs = minFreshnessMs;
+ _idleTimeoutMs = idleTimeoutMs;
}
private boolean isOffsetCaughtUp(StreamPartitionMsgOffset currentOffset,
StreamPartitionMsgOffset latestOffset) {
@@ -52,6 +54,10 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
return false;
}
+ private boolean segmentHasBeenIdleLongerThanThreshold(long segmentIdleTime) {
+ return _idleTimeoutMs > 0 && segmentIdleTime > _idleTimeoutMs;
+ }
+
protected long now() {
return System.currentTimeMillis();
}
@@ -76,15 +82,27 @@ public class FreshnessBasedConsumptionStatusChecker extends
IngestionBasedConsum
StreamPartitionMsgOffset currentOffset =
rtSegmentDataManager.getCurrentOffset();
StreamPartitionMsgOffset latestStreamOffset =
rtSegmentDataManager.fetchLatestStreamOffset(5000);
if (isOffsetCaughtUp(currentOffset, latestStreamOffset)) {
- _logger.info("Segment {} with freshness {}ms has not caught up within
min freshness {}."
+ _logger.info("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
+ "But the current ingested offset is equal to the latest
available offset {}.", segmentName, freshnessMs,
_minFreshnessMs, currentOffset);
return true;
}
+ StreamPartitionMsgOffset earliestStreamOffset =
rtSegmentDataManager.fetchEarliestStreamOffset(5000);
+
+ long idleTimeMs = rtSegmentDataManager.getTimeSinceEventLastConsumedMs();
+ if (segmentHasBeenIdleLongerThanThreshold(idleTimeMs)) {
+ _logger.warn("Segment {} with freshness {}ms has not caught up within
min freshness {}. "
+ + "But the current ingested offset {} has been idle for {}ms. At
offset {}. Earliest offset {}. "
+ + "Latest offset {}.", segmentName, freshnessMs,
_minFreshnessMs, currentOffset, idleTimeMs,
+ currentOffset,
+ earliestStreamOffset, latestStreamOffset);
+ return true;
+ }
+
_logger.info("Segment {} with freshness {}ms has not caught up within "
- + "min freshness {}. At offset {}. Latest offset {}.",
- segmentName, freshnessMs, _minFreshnessMs, currentOffset,
latestStreamOffset);
+ + "min freshness {}. At offset {}. Earliest offset {}. Latest
offset {}.", segmentName, freshnessMs,
+ _minFreshnessMs, currentOffset, earliestStreamOffset,
latestStreamOffset);
return false;
}
}
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
index 7f48e2f9f0..cab178bc0d 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusCheckerTest.java
@@ -42,8 +42,8 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
private final long _now;
public FakeFreshnessBasedConsumptionStatusChecker(InstanceDataManager
instanceDataManager,
- Set<String> consumingSegments, long minFreshnessMs, long now) {
- super(instanceDataManager, consumingSegments, minFreshnessMs);
+ Set<String> consumingSegments, long minFreshnessMs, long
idleTimeoutMs, long now) {
+ super(instanceDataManager, consumingSegments, minFreshnessMs,
idleTimeoutMs);
_now = now;
}
@@ -61,7 +61,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10000L);
+ new FreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10000L, 0L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -136,8 +136,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(
- instanceDataManager, consumingSegments, 10L, 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -191,6 +190,130 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
}
+ @Test
+ public void regularCaseWithIdleTimeout() {
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ String segB0 = "tableB__0__0__123Z";
+ Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ long idleTimeoutMs = 10L;
+ FreshnessBasedConsumptionStatusChecker statusChecker =
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, idleTimeoutMs,
+ 100L);
+
+ // TableDataManager is not set up yet
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
+
+ // setup TableDataMangers
+ TableDataManager tableDataManagerA = mock(TableDataManager.class);
+ TableDataManager tableDataManagerB = mock(TableDataManager.class);
+
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+ // setup SegmentDataManagers
+ LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+ when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+ when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ // ensure negative values are ignored
+ setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+ setupLatestIngestionTimestamp(segMngrA1, -1L);
+ setupLatestIngestionTimestamp(segMngrB0, 0L);
+
+ when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+ when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+ when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+
+ // total idle time
+ // segA0 0
+ // segA1 0
+ // segB0 0
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
+
+ when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
- 1);
+
when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs);
+ when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
+ 1);
+ // total idle time
+ // segA0 9
+ // segA1 10
+ // segB0 11
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
2);
+
+
when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs);
+ when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
+ 1);
+ // total idle time
+ // segA0 10
+ // segA1 11
+ // segB0 11
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
1);
+
+ when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(idleTimeoutMs
+ 1);
+ // total idle time
+ // segA0 11
+ // segA1 11
+ // segB0 11
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
0);
+ }
+
+ @Test
+ public void
testSegmentsNeverHealthyWhenIdleTimeoutZeroAndNoOtherCriteriaMet() {
+ String segA0 = "tableA__0__0__123Z";
+ String segA1 = "tableA__1__0__123Z";
+ String segB0 = "tableB__0__0__123Z";
+ Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
+ InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+ FreshnessBasedConsumptionStatusChecker statusChecker =
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
+
+ // TableDataManager is not set up yet
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
+
+ // setup TableDataMangers
+ TableDataManager tableDataManagerA = mock(TableDataManager.class);
+ TableDataManager tableDataManagerB = mock(TableDataManager.class);
+
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
+
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(tableDataManagerB);
+
+ // setup SegmentDataManagers
+ LLRealtimeSegmentDataManager segMngrA0 =
mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrA1 =
mock(LLRealtimeSegmentDataManager.class);
+ LLRealtimeSegmentDataManager segMngrB0 =
mock(LLRealtimeSegmentDataManager.class);
+ when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
+ when(tableDataManagerA.acquireSegment(segA1)).thenReturn(segMngrA1);
+ when(tableDataManagerB.acquireSegment(segB0)).thenReturn(segMngrB0);
+
+ when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ when(segMngrA1.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ when(segMngrB0.fetchLatestStreamOffset(5000)).thenReturn(new
LongMsgOffset(20));
+ when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrA1.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ when(segMngrB0.getCurrentOffset()).thenReturn(new LongMsgOffset(10));
+ // ensure negative values are ignored
+ setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);
+ setupLatestIngestionTimestamp(segMngrA1, -1L);
+ setupLatestIngestionTimestamp(segMngrB0, 0L);
+
+ when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+ when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+ when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(0L);
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
+
+ when(segMngrA0.getTimeSinceEventLastConsumedMs()).thenReturn(10L);
+ when(segMngrA1.getTimeSinceEventLastConsumedMs()).thenReturn(100L);
+ when(segMngrB0.getTimeSinceEventLastConsumedMs()).thenReturn(1000L);
+
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
+ }
+
@Test
public void segmentBeingCommmitted() {
String segA0 = "tableA__0__0__123Z";
@@ -199,7 +322,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
@@ -257,7 +380,7 @@ public class FreshnessBasedConsumptionStatusCheckerTest {
Set<String> consumingSegments = ImmutableSet.of(segA0, segA1, segB0);
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
- new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 100L);
+ new FakeFreshnessBasedConsumptionStatusChecker(instanceDataManager,
consumingSegments, 10L, 0L, 100L);
// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(),
3);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index d77a8a9e93..775ff1fc58 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -549,6 +549,18 @@ public class CommonConstants {
public static final String
CONFIG_OF_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER =
"pinot.server.starter.enableRealtimeFreshnessBasedConsumptionStatusChecker";
public static final boolean
DEFAULT_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER = false;
+ // This configuration is in place to avoid servers getting stuck checking
for freshness in
+ // cases where they will never be able to reach the freshness threshold or
the latest offset.
+ // The only current case where we have seen this is low volume streams
using read_committed
+ // because of transactional publishes where the last message in the stream
is an
+ // un-consumable kafka control message, and it is impossible to tell if
the consumer is stuck
+ // or some offsets will never be consumable.
+ //
+ // When in doubt, do not enable this configuration as it can cause a
lagged server to start
+ // serving queries.
+ public static final String CONFIG_OF_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS =
+ "pinot.server.starter.realtimeFreshnessIdleTimeoutMs";
+ public static final int DEFAULT_REALTIME_FRESHNESS_IDLE_TIMEOUT_MS = 0;
public static final String CONFIG_OF_STARTUP_REALTIME_MIN_FRESHNESS_MS =
"pinot.server.starter.realtimeMinFreshnessMs";
// Use 10 seconds by default so high volume stream are able to catch up.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]