zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401103893
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
// If the buffer was full and we weren't able to add the message,
grab a new stream iterator starting
// from this message and back off for a bit to let the buffer
drain before retrying.
- if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
+ recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+ while (!records.offer(
+ new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+ recordBufferOfferWaitMillis,
+ TimeUnit.MILLISECONDS
+ )) {
log.warn(
"Kinesis records are being processed slower than they are
fetched. "
+ "OrderedPartitionableRecord buffer full, storing iterator
and retrying in [%,dms].",
recordBufferFullWait
);
-
- shardIterator = kinesis.getShardIterator(
- currRecord.getStream(),
- currRecord.getPartitionId(),
- ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
- currRecord.getSequenceNumber()
- ).getShardIterator();
-
- scheduleBackgroundFetch(recordBufferFullWait);
- return;
+ recordBufferOfferWaitMillis = recordBufferFullWait;
Review Comment:
Previouslt when the record buffer is full here, the fetchRecords logic threw
away the rest of the GetRecords result after recordBufferOfferTimeout and
starts a new shard iterator. This seemed excessively churny. Instead we wait an
unbounded amount of time for queue to stop being full. If the queue remains
full, we’ll end up right back waiting for it after the restarted fetch.
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
// If the buffer was full and we weren't able to add the message,
grab a new stream iterator starting
// from this message and back off for a bit to let the buffer
drain before retrying.
- if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
+ recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+ while (!records.offer(
+ new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+ recordBufferOfferWaitMillis,
+ TimeUnit.MILLISECONDS
+ )) {
log.warn(
"Kinesis records are being processed slower than they are
fetched. "
+ "OrderedPartitionableRecord buffer full, storing iterator
and retrying in [%,dms].",
recordBufferFullWait
);
-
- shardIterator = kinesis.getShardIterator(
- currRecord.getStream(),
- currRecord.getPartitionId(),
- ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
- currRecord.getSequenceNumber()
- ).getShardIterator();
-
- scheduleBackgroundFetch(recordBufferFullWait);
- return;
+ recordBufferOfferWaitMillis = recordBufferFullWait;
Review Comment:
Previouslt when the record buffer is full here, the fetchRecords logic threw
away the rest of the GetRecords result after recordBufferOfferTimeout and
starts a new shard iterator. This seemed excessively churny. Instead we wait an
unbounded amount of time for queue to stop being full. If the queue remains
full, we’ll end up right back waiting for it after the restarted fetch.
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
// If the buffer was full and we weren't able to add the message,
grab a new stream iterator starting
// from this message and back off for a bit to let the buffer
drain before retrying.
- if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
+ recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+ while (!records.offer(
+ new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+ recordBufferOfferWaitMillis,
+ TimeUnit.MILLISECONDS
+ )) {
log.warn(
"Kinesis records are being processed slower than they are
fetched. "
+ "OrderedPartitionableRecord buffer full, storing iterator
and retrying in [%,dms].",
recordBufferFullWait
);
-
- shardIterator = kinesis.getShardIterator(
- currRecord.getStream(),
- currRecord.getPartitionId(),
- ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
- currRecord.getSequenceNumber()
- ).getShardIterator();
-
- scheduleBackgroundFetch(recordBufferFullWait);
- return;
+ recordBufferOfferWaitMillis = recordBufferFullWait;
Review Comment:
Previously when the record buffer is full here, the fetchRecords logic threw
away the rest of the GetRecords result after recordBufferOfferTimeout and
starts a new shard iterator. This seemed excessively churny. Instead we wait an
unbounded amount of time for queue to stop being full. If the queue remains
full, we’ll end up right back waiting for it after the restarted fetch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]