This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 25b1fcbcf0 Improve tests when errors are received in the consumer
thread (#11858)
25b1fcbcf0 is described below
commit 25b1fcbcf0a8003b63af8e59bda109a89dae490d
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 23 23:19:32 2023 +0200
Improve tests when errors are received in the consumer thread (#11858)
---
.../data/manager/realtime/RealtimeSegmentDataManager.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index f3e8fc588e..f954eff0e1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -391,12 +391,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_serverMetrics.addMeteredTableValue(_tableStreamName,
ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
1L);
if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) {
- _segmentLogger.warn("Stream transient exception when fetching messages,
stopping consumption after {} attempts",
- _consecutiveErrorCount, e);
+ _segmentLogger.warn("Stream transient exception when fetching messages,
stopping consumption after "
+ + _consecutiveErrorCount + " attempts", e);
throw e;
} else {
- _segmentLogger
- .warn("Stream transient exception when fetching messages, retrying
(count={})", _consecutiveErrorCount, e);
+ _segmentLogger.warn("Stream transient exception when fetching messages,
retrying (count="
+ + _consecutiveErrorCount + ")", e);
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
recreateStreamConsumer("Too many transient errors");
}
@@ -444,6 +444,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// One such exception seen so far is java.net.SocketTimeoutException
handleTransientStreamErrors(e);
continue;
+ } catch (Throwable t) {
+ _segmentLogger.warn("Stream error when fetching messages, stopping
consumption", t);
+ throw t;
}
boolean endCriteriaReached = processStreamEvents(messageBatch,
idlePipeSleepTimeMillis);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]