zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1409815785
##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size
optimization](../../operations/segment-optimi
Kinesis indexing tasks fetch records using `fetchThreads` threads.
If `fetchThreads` is higher than the number of Kinesis shards, the excess
threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a
Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard,
with a delay between fetches
of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size
`recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size
`recordBufferSizeBytes`.
The main runner thread for each task polls up to `maxRecordsPerPoll` records
from the queue at once.
-When using Kinesis Producer Library's aggregation feature, that is when
[`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual
records.
-
The default values for these parameters are:
- `fetchThreads`: Twice the number of processors available to the task. The
number of processors available to the task
is the total number of processors on the server, divided by
`druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data
record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB
of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a
warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
- `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is
smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and
1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever
is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and
1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated
records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap,
whichever is smaller.
+- `maxRecordsPerPoll`: 1.
Review Comment:
updated
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -437,42 +429,35 @@ public KinesisRecordSupplier(
{
Preconditions.checkNotNull(amazonKinesis);
this.kinesis = amazonKinesis;
- this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis;
- this.deaggregate = deaggregate;
this.recordBufferOfferTimeout = recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait;
this.maxRecordsPerPoll = maxRecordsPerPoll;
this.fetchThreads = fetchThreads;
- this.recordBufferSize = recordBufferSize;
+ this.recordBufferSizeBytes = recordBufferSizeBytes;
this.useEarliestSequenceNumber = useEarliestSequenceNumber;
this.useListShards = useListShards;
this.backgroundFetchEnabled = fetchThreads > 0;
// the deaggregate function is implemented by the amazon-kinesis-client,
whose license is not compatible with Apache.
// The work around here is to use reflection to find the deaggregate
function in the classpath. See details on the
// docs page for more information on how to use deaggregation
- if (deaggregate) {
- try {
- Class<?> kclUserRecordclass =
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
- MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+ try {
+ Class<?> kclUserRecordclass =
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
Review Comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]