KKcorps commented on code in PR #12697:
URL: https://github.com/apache/pinot/pull/12697#discussion_r1538609651


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -52,220 +46,112 @@
  */
 public class KinesisConsumer extends KinesisConnectionHandler implements 
PartitionGroupConsumer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisConsumer.class);
-  public static final long SLEEP_TIME_BETWEEN_REQUESTS = 1000L;
-  private final String _streamTopicName;
-  private final int _numMaxRecordsToFetch;
-  private final ExecutorService _executorService;
-  private final ShardIteratorType _shardIteratorType;
-  private final int _rpsLimit;
 
-  public KinesisConsumer(KinesisConfig kinesisConfig) {
-    super(kinesisConfig);
-    _streamTopicName = kinesisConfig.getStreamTopicName();
-    _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
-    _shardIteratorType = kinesisConfig.getShardIteratorType();
-    _rpsLimit = kinesisConfig.getRpsLimit();
-    _executorService = Executors.newSingleThreadExecutor();
+  private int _currentSecond = 0;
+  private int _numRequestsInCurrentSecond = 0;
+
+  public KinesisConsumer(KinesisConfig config) {
+    super(config);
+    LOGGER.info("Created Kinesis consumer with topic: {}, RPS limit: {}, max 
records per fetch: {}",
+        config.getStreamTopicName(), config.getRpsLimit(), 
config.getNumMaxRecordsToFetch());
   }
 
   @VisibleForTesting
-  public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient 
kinesisClient) {
-    super(kinesisConfig, kinesisClient);
-    _kinesisClient = kinesisClient;
-    _streamTopicName = kinesisConfig.getStreamTopicName();
-    _numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
-    _shardIteratorType = kinesisConfig.getShardIteratorType();
-    _rpsLimit = kinesisConfig.getRpsLimit();
-    _executorService = Executors.newSingleThreadExecutor();
+  public KinesisConsumer(KinesisConfig config, KinesisClient kinesisClient) {
+    super(config, kinesisClient);
   }
 
   /**
    * Fetch records from the Kinesis stream between the start and end 
KinesisCheckpoint
    */
   @Override
-  public KinesisRecordsBatch fetchMessages(StreamPartitionMsgOffset 
startCheckpoint,
-      StreamPartitionMsgOffset endCheckpoint, int timeoutMs) {
-    List<KinesisStreamMessage> recordList = new ArrayList<>();
-    Future<KinesisRecordsBatch> kinesisFetchResultFuture =
-        _executorService.submit(() -> getResult(startCheckpoint, 
endCheckpoint, recordList));
-
-    try {
-      return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
-    } catch (TimeoutException e) {
-      kinesisFetchResultFuture.cancel(true);
-      return handleException((KinesisPartitionGroupOffset) startCheckpoint, 
recordList);
-    } catch (Exception e) {
-      return handleException((KinesisPartitionGroupOffset) startCheckpoint, 
recordList);
-    }
-  }
-
-  private KinesisRecordsBatch getResult(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset endOffset,
-      List<KinesisStreamMessage> recordList) {
-    KinesisPartitionGroupOffset kinesisStartCheckpoint = 
(KinesisPartitionGroupOffset) startOffset;
-
-    try {
-      if (_kinesisClient == null) {
-        createConnection();
-      }
-
-      // TODO: iterate upon all the shardIds in the map
-      //  Okay for now, since we have assumed that every partition group 
contains a single shard
-      Map<String, String> startShardToSequenceMap = 
kinesisStartCheckpoint.getShardToStartSequenceMap();
-      Preconditions.checkState(startShardToSequenceMap.size() == 1,
-          "Only 1 shard per consumer supported. Found: %s, in 
startShardToSequenceMap",
-          startShardToSequenceMap.keySet());
-      Map.Entry<String, String> startShardToSequenceNum = 
startShardToSequenceMap.entrySet().iterator().next();
-      String shardIterator = 
getShardIterator(startShardToSequenceNum.getKey(), 
startShardToSequenceNum.getValue());
-
-      String kinesisEndSequenceNumber = null;
-
-      if (endOffset != null) {
-        KinesisPartitionGroupOffset kinesisEndCheckpoint = 
(KinesisPartitionGroupOffset) endOffset;
-        Map<String, String> endShardToSequenceMap = 
kinesisEndCheckpoint.getShardToStartSequenceMap();
-        Preconditions.checkState(endShardToSequenceMap.size() == 1,
-            "Only 1 shard per consumer supported. Found: %s, in 
endShardToSequenceMap", endShardToSequenceMap.keySet());
-        kinesisEndSequenceNumber = 
endShardToSequenceMap.values().iterator().next();
-      }
-
-      String nextStartSequenceNumber;
-      boolean isEndOfShard = false;
-      long currentWindow = System.currentTimeMillis() / 
SLEEP_TIME_BETWEEN_REQUESTS;
-      int currentWindowRequests = 0;
-      while (shardIterator != null) {
-        GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(shardIterator).build();
-
-        long requestSentTime = System.currentTimeMillis() / 1000;
-        GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordsRequest);
-
-        if (!getRecordsResponse.records().isEmpty()) {
-          getRecordsResponse.records().forEach(record -> {
-            recordList.add(
-            new 
KinesisStreamMessage(record.partitionKey().getBytes(StandardCharsets.UTF_8),
-                record.data().asByteArray(), record.sequenceNumber(),
-                (KinesisStreamMessageMetadata) 
_kinesisMetadataExtractor.extract(record),
-                record.data().asByteArray().length));
-          });
-          nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
-
-          if (kinesisEndSequenceNumber != null && 
kinesisEndSequenceNumber.compareTo(nextStartSequenceNumber) <= 0) {
-            break;
-          }
-
-          if (recordList.size() >= _numMaxRecordsToFetch) {
-            break;
-          }
-        }
-
-        if (getRecordsResponse.hasChildShards() && 
!getRecordsResponse.childShards().isEmpty()) {
-          //This statement returns true only when end of current shard has 
reached.
-          // hasChildShards only checks if the childShard is null and is a 
valid instance.
-          isEndOfShard = true;
-          break;
-        }
-
-        shardIterator = getRecordsResponse.nextShardIterator();
-
-        if (Thread.interrupted()) {
-          break;
-        }
-
-        // Kinesis enforces a limit of 5 .getRecords request per second on 
each shard from AWS end
-        // Beyond this limit we start getting 
ProvisionedThroughputExceededException which affect the ingestion
-        if (requestSentTime == currentWindow) {
-          currentWindowRequests++;
-        } else if (requestSentTime > currentWindow) {
-          currentWindow = requestSentTime;
-          currentWindowRequests = 0;
-        }
-
-        if (currentWindowRequests >= _rpsLimit) {
-          try {
-            Thread.sleep(SLEEP_TIME_BETWEEN_REQUESTS);
-          } catch (InterruptedException e) {
-            LOGGER.debug("Sleep interrupted while rate limiting Kinesis 
requests", e);
-            break;
-          }
+  public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset 
startMsgOffset, int timeoutMs) {

Review Comment:
   How do we enforce the timeoutMs here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to