zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1406907369


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size 
optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess 
threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a 
Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, 
with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size 
`recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size 
`recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records 
from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when 
[`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual 
records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The 
number of processors available to the task
 is the total number of processors on the server, divided by 
`druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data 
record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB 
of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a 
warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is 
smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 
1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever 
is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 
1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated 
records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, 
whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   Changed it so that it polls for at least one record and at most 1_000_000 
bytes if more than 1 record, which is what we were targeting for before.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +174,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer 
configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // 
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), 
cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. 
Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, 
but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * 
KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / 10_000_000L)

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to