This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new dd8a6477c2 Reduce Kinesis default rate limit to 1 to account for
replication (#13649)
dd8a6477c2 is described below
commit dd8a6477c22e761a2a6dc3264b9c2a86c94427fd
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Jul 31 12:47:24 2024 +0530
Reduce Kinesis default rate limit to 1 to account for replication (#13649)
* Use debug logs in case we run into rate limit exceeded exception
* lower kinesis rate limit
* Fix exception
---------
Co-authored-by: Kartik Khare <[email protected]>
---
.../apache/pinot/plugin/stream/kinesis/KinesisConfig.java | 8 +++++++-
.../pinot/plugin/stream/kinesis/KinesisConsumer.java | 14 ++++++++++++--
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 529e218e90..6f84407006 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -70,7 +70,13 @@ public class KinesisConfig {
public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
- public static final String DEFAULT_RPS_LIMIT = "5";
+
+ // Kinesis has a default limit of 5 getRecord requests per second per shard.
+ // This limit is enforced by Kinesis and is not configurable.
+ // We are setting it to 1 to avoid hitting the limit in a replicated setup,
+ // where multiple replicas are fetching from the same shard.
+ // see -
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
+ public static final String DEFAULT_RPS_LIMIT = "1";
private final String _streamTopicName;
private final String _awsRegion;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index e7bb76797a..d90b1b61bb 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -63,7 +63,17 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
@Override
public synchronized KinesisMessageBatch
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
- KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset)
startMsgOffset;
+ try {
+ return getKinesisMessageBatch((KinesisPartitionGroupOffset)
startMsgOffset);
+ } catch (ProvisionedThroughputExceededException pte) {
+ LOGGER.error("Rate limit exceeded while fetching messages from Kinesis
stream: {} with threshold: {}",
+ pte.getMessage(), _config.getRpsLimit());
+ return new KinesisMessageBatch(List.of(), (KinesisPartitionGroupOffset)
startMsgOffset, false);
+ }
+ }
+
+ private KinesisMessageBatch
getKinesisMessageBatch(KinesisPartitionGroupOffset startMsgOffset) {
+ KinesisPartitionGroupOffset startOffset = startMsgOffset;
String shardId = startOffset.getShardId();
String startSequenceNumber = startOffset.getSequenceNumber();
// Get the shard iterator
@@ -122,7 +132,7 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- _currentSecond++;
+ _currentSecond = (int)
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
_numRequestsInCurrentSecond = 1;
} else {
_numRequestsInCurrentSecond++;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]