Jackie-Jiang commented on code in PR #12806:
URL: https://github.com/apache/pinot/pull/12806#discussion_r1556472410
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -69,113 +60,56 @@ public KinesisConsumer(KinesisConfig config, KinesisClient
kinesisClient) {
super(config, kinesisClient);
}
- /**
- * Fetch records from the Kinesis stream between the start and end
KinesisCheckpoint
- */
@Override
- public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset, int timeoutMs) {
+ public synchronized KinesisMessageBatch
fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
KinesisPartitionGroupOffset startOffset = (KinesisPartitionGroupOffset)
startMsgOffset;
- List<BytesStreamMessage> messages = new ArrayList<>();
- Future<KinesisMessageBatch> kinesisFetchResultFuture =
- _executorService.submit(() -> getResult(startOffset, messages));
- try {
- return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- kinesisFetchResultFuture.cancel(true);
- } catch (Exception e) {
- // Ignored
- }
- return buildKinesisMessageBatch(startOffset, messages, false);
- }
-
- private KinesisMessageBatch getResult(KinesisPartitionGroupOffset
startOffset, List<BytesStreamMessage> messages) {
- try {
- String shardId = startOffset.getShardId();
- String shardIterator = getShardIterator(shardId,
startOffset.getSequenceNumber());
- boolean endOfShard = false;
- long currentWindow = System.currentTimeMillis() /
SLEEP_TIME_BETWEEN_REQUESTS;
- int currentWindowRequests = 0;
- while (shardIterator != null) {
- GetRecordsRequest getRecordsRequest =
GetRecordsRequest.builder().shardIterator(shardIterator).build();
- long requestSentTime = System.currentTimeMillis() / 1000;
- GetRecordsResponse getRecordsResponse =
_kinesisClient.getRecords(getRecordsRequest);
- List<Record> records = getRecordsResponse.records();
- if (!records.isEmpty()) {
- for (Record record : records) {
- messages.add(extractStreamMessage(record, shardId));
- }
- if (messages.size() >= _config.getNumMaxRecordsToFetch()) {
- break;
- }
+ String shardId = startOffset.getShardId();
+ String startSequenceNumber = startOffset.getSequenceNumber();
+
+ // NOTE: Kinesis enforces a limit of 5 getRecords request per second on
each shard from AWS end, beyond which we
+ // start getting ProvisionedThroughputExceededException. Rate limit
the requests to avoid this.
+ long currentTimeMs = System.currentTimeMillis();
Review Comment:
I didn't find one from Kinesis client. Seems it will just throw
`LimitExceededException`.
The `rps` is currently configured on Pinot side though, so I guess it makes
sense to rate limit on the Pinot side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]