zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401103893


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, 
grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer 
drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are 
fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator 
and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   Previouslt when the record buffer is full here, the fetchRecords logic threw 
away the rest of the GetRecords result after recordBufferOfferTimeout and 
starts a new shard iterator. This seemed excessively churny. Instead we wait an 
unbounded amount of time for queue to stop being full. If the queue remains 
full, we’ll end up right back waiting for it after the restarted fetch.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, 
grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer 
drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are 
fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator 
and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   Previouslt when the record buffer is full here, the fetchRecords logic threw 
away the rest of the GetRecords result after recordBufferOfferTimeout and 
starts a new shard iterator. This seemed excessively churny. Instead we wait an 
unbounded amount of time for queue to stop being full. If the queue remains 
full, we’ll end up right back waiting for it after the restarted fetch.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, 
grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer 
drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are 
fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator 
and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   Previously when the record buffer is full here, the fetchRecords logic threw 
away the rest of the GetRecords result after recordBufferOfferTimeout and 
starts a new shard iterator. This seemed excessively churny. Instead we wait an 
unbounded amount of time for queue to stop being full. If the queue remains 
full, we’ll end up right back waiting for it after the restarted fetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to