This is an automated email from the ASF dual-hosted git repository.
KKcorps pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0a8ac02c8a0 fix(kinesis): throttle
ProvisionedThroughputExceededException logs using ThrottledLogger (#18572)
0a8ac02c8a0 is described below
commit 0a8ac02c8a062ced0f8542f5d0351b0ab2e31db4
Author: swaminathanmanish <[email protected]>
AuthorDate: Tue May 26 15:39:47 2026 +0530
fix(kinesis): throttle ProvisionedThroughputExceededException logs using
ThrottledLogger (#18572)
* Throttle KinesisConsumer rate-limit error logs using ThrottledLogger
ProvisionedThroughputExceededException is caught per-fetch call, which
can fire at high frequency across hundreds of shards under sustained AWS
rate limiting, flooding logs and filling ephemeral storage. Replace the
unconditional LOGGER.error with ThrottledLogger (5 logs/min) so the
error remains visible but does not cause disk pressure.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* Throttle KinesisConsumer rate-limit warn logs using ThrottledLogger
Rate-limit exceptions can fire at high frequency across hundreds of
shards under sustained AWS throttling. Replace bare LOGGER.warn calls
in fetchMessages retry loop, logRateLimitTimeout, and
logRequestLimiterTimeout with ThrottledLogger (5 logs/min per exception
class) to prevent log flooding and disk pressure while preserving
visibility via dropped-count reporting.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
---------
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../pinot-stream-ingestion/pinot-kinesis/pom.xml | 4 +++
.../plugin/stream/kinesis/KinesisConsumer.java | 31 +++++++++++++---------
2 files changed, 22 insertions(+), 13 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 2315fe9f537..61aea5485fd 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -35,6 +35,10 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-common</artifactId>
+ </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 5c017acc076..3b8b35fd9be 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -30,6 +30,7 @@ import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
@@ -54,7 +55,9 @@ public class KinesisConsumer extends KinesisConnectionHandler
implements Partiti
private static final int MAX_RATE_LIMIT_BACKOFF_MS = 5000;
private static final int RATE_LIMIT_BACKOFF_JITTER_BOUND_MS = 250;
private static final RequestRateLimiter SHARED_REQUEST_RATE_LIMITER = new
SharedKinesisRequestRateLimiter();
+ private static final double RATE_LIMIT_LOG_RATE_PER_MIN = 5.0;
+ private final ThrottledLogger _throttledLogger = new ThrottledLogger(LOGGER,
RATE_LIMIT_LOG_RATE_PER_MIN);
private String _nextStartSequenceNumber = null;
private String _nextShardIterator = null;
private final RequestRateLimiter _requestRateLimiter;
@@ -109,10 +112,11 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
return new KinesisMessageBatch(List.of(), startOffset, false, 0);
}
long backoffMs = Math.min(computeRateLimitBackoffMs(attempts),
remainingMs);
- LOGGER.warn("Rate limit exceeded while fetching messages from Kinesis
stream: {}, shard: {}, operation: {}, "
- + "threshold: {}, attempt: {}, backing off for {} ms. Error:
{}", _config.getStreamTopicName(),
- startOffset.getShardId(), e.getRequestType(),
_config.getRpsLimitPerSecond(), attempts, backoffMs,
- e.getCause().getMessage());
+ _throttledLogger.warn(
+ String.format("Rate limit exceeded while fetching messages from
Kinesis stream: %s, shard: %s, "
+ + "operation: %s, threshold: %s, attempt: %d, backing off
for %d ms",
+ _config.getStreamTopicName(), startOffset.getShardId(),
e.getRequestType(),
+ _config.getRpsLimitPerSecond(), attempts, backoffMs),
e.getCause());
sleep(backoffMs);
} catch (KinesisRequestTimeoutException e) {
logRequestLimiterTimeout(startOffset, e);
@@ -220,19 +224,20 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
private void logRateLimitTimeout(KinesisPartitionGroupOffset startOffset,
int attempts,
KinesisRateLimitException rateLimitException) {
- LOGGER.warn("Rate limit exceeded while fetching messages from Kinesis
stream: {}, shard: {}, operation: {}, "
- + "threshold: {}, attempts: {}. Fetch timeout exhausted; returning
empty batch at original offset. "
- + "Error: {}",
- _config.getStreamTopicName(), startOffset.getShardId(),
rateLimitException.getRequestType(),
- _config.getRpsLimitPerSecond(), attempts,
rateLimitException.getCause().getMessage());
+ _throttledLogger.warn(
+ String.format("Rate limit exceeded while fetching messages from
Kinesis stream: %s, shard: %s, "
+ + "operation: %s, threshold: %s, attempts: %d. Fetch timeout
exhausted; returning empty batch.",
+ _config.getStreamTopicName(), startOffset.getShardId(),
rateLimitException.getRequestType(),
+ _config.getRpsLimitPerSecond(), attempts),
rateLimitException.getCause());
}
private void logRequestLimiterTimeout(KinesisPartitionGroupOffset
startOffset,
KinesisRequestTimeoutException timeoutException) {
- LOGGER.warn("Timed out waiting for Kinesis request limiter while fetching
messages from stream: {}, shard: {}, "
- + "operation: {}, threshold: {}. Fetch timeout exhausted;
returning empty batch at original offset.",
- _config.getStreamTopicName(), startOffset.getShardId(),
timeoutException.getRequestType(),
- _config.getRpsLimitPerSecond());
+ _throttledLogger.warn(
+ String.format("Timed out waiting for Kinesis request limiter while
fetching messages from stream: %s, "
+ + "shard: %s, operation: %s, threshold: %s. Fetch timeout
exhausted; returning empty batch.",
+ _config.getStreamTopicName(), startOffset.getShardId(),
timeoutException.getRequestType(),
+ _config.getRpsLimitPerSecond()), timeoutException);
}
private BytesStreamMessage extractStreamMessage(Record record, String
shardId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]