This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 25b1fcbcf0 Improve tests when errors are received in the consumer 
thread (#11858)
25b1fcbcf0 is described below

commit 25b1fcbcf0a8003b63af8e59bda109a89dae490d
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 23 23:19:32 2023 +0200

    Improve tests when errors are received in the consumer thread (#11858)
---
 .../data/manager/realtime/RealtimeSegmentDataManager.java     | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index f3e8fc588e..f954eff0e1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -391,12 +391,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS,
         1L);
     if (_consecutiveErrorCount > MAX_CONSECUTIVE_ERROR_COUNT) {
-      _segmentLogger.warn("Stream transient exception when fetching messages, 
stopping consumption after {} attempts",
-          _consecutiveErrorCount, e);
+      _segmentLogger.warn("Stream transient exception when fetching messages, 
stopping consumption after "
+          + _consecutiveErrorCount + " attempts", e);
       throw e;
     } else {
-      _segmentLogger
-          .warn("Stream transient exception when fetching messages, retrying 
(count={})", _consecutiveErrorCount, e);
+      _segmentLogger.warn("Stream transient exception when fetching messages, 
retrying (count="
+          + _consecutiveErrorCount + ")", e);
       Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
       recreateStreamConsumer("Too many transient errors");
     }
@@ -444,6 +444,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         // One such exception seen so far is java.net.SocketTimeoutException
         handleTransientStreamErrors(e);
         continue;
+      } catch (Throwable t) {
+        _segmentLogger.warn("Stream error when fetching messages, stopping 
consumption", t);
+        throw t;
       }
 
       boolean endCriteriaReached = processStreamEvents(messageBatch, 
idlePipeSleepTimeMillis);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to