gianm commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1408828126
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -637,21 +622,22 @@ public List<OrderedPartitionableRecord<String, String,
ByteEntity>> poll(long ti
try {
int expectedSize = Math.min(Math.max(records.size(), 1),
maxRecordsPerPoll);
- List<OrderedPartitionableRecord<String, String, ByteEntity>>
polledRecords = new ArrayList<>(expectedSize);
+
List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String,
String, ByteEntity>>> polledRecords = new ArrayList<>(expectedSize);
- Queues.drain(
- records,
+ records.drain(
polledRecords,
- expectedSize,
+ MAX_BYTES_PER_POLL,
Review Comment:
It looks like `maxRecordsPerPoll` isn't doing anything anymore. Is that
right? If so let's get rid of it.
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,23 @@ private void
filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
}
// filter records in buffer and only retain ones whose partition was not
seeked
- BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ
= new LinkedBlockingQueue<>(recordBufferSize);
+ MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String,
ByteEntity>> newQ =
+ new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
records.stream()
- .filter(x -> !partitions.contains(x.getStreamPartition()))
- .forEachOrdered(newQ::offer);
+ .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+ .forEachOrdered(x -> {
+ if (!newQ.offer(x)) {
+ // this should never really happen in practice but adding check
here for safety.
Review Comment:
Checks that should never happen, but are for safety, should be
`DruidException.defensive`
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +296,18 @@ private Runnable fetchRecords()
// If the buffer was full and we weren't able to add the message,
grab a new stream iterator starting
// from this message and back off for a bit to let the buffer
drain before retrying.
- if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
Review Comment:
The comment above is no longer accurate -- we aren't grabbing new stream
iterators anymore when the buffer is full.
##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size
optimization](../../operations/segment-optimi
Kinesis indexing tasks fetch records using `fetchThreads` threads.
If `fetchThreads` is higher than the number of Kinesis shards, the excess
threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a
Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard,
with a delay between fetches
of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size
`recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size
`recordBufferSizeBytes`.
The main runner thread for each task polls up to `maxRecordsPerPoll` records
from the queue at once.
-When using Kinesis Producer Library's aggregation feature, that is when
[`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual
records.
-
The default values for these parameters are:
- `fetchThreads`: Twice the number of processors available to the task. The
number of processors available to the task
is the total number of processors on the server, divided by
`druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data
record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB
of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a
warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
- `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is
smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and
1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever
is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and
1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated
records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap,
whichever is smaller.
+- `maxRecordsPerPoll`: 1.
Review Comment:
So does that mean we should update the `maxRecordsPerPoll: 1` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]