gianm commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1409823324


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -78,6 +82,14 @@ public KinesisIndexTask(
     );
     this.useListShards = useListShards;
     this.awsCredentialsConfig = awsCredentialsConfig;
+    if (tuningConfig.getRecordBufferSizeConfigured() != null) {

Review Comment:
   Please move these two checks to `run` rather than the constructor, because 
we don't need to log this stuff every time a task object is constructed. (That 
happens at various points on the Overlord due to various API calls and internal 
machinations, and will create a log of log spam.)



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java:
##########
@@ -71,12 +69,10 @@ public KinesisSupervisorIOConfig(
       @JsonProperty("lateMessageRejectionPeriod") Period 
lateMessageRejectionPeriod,
       @JsonProperty("earlyMessageRejectionPeriod") Period 
earlyMessageRejectionPeriod,
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime 
lateMessageRejectionStartDateTime,
-      @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
-      @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig 
autoScalerConfig,
-      @JsonProperty("deaggregate") boolean deaggregate

Review Comment:
   The `recordsPerFetch` and `deaggregate` properties should stay here for 
better compatibility during rolling updates and rollbacks. (We don't want to 
lose track of them prior to a potential rollback.)
   
   So let's instead mark them deprecated, but keep them.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +186,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer 
configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // 
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), 
cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. 
Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, 
but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * 
KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
+    );
+    if (fetchThreads > maxFetchThreads) {
+      log.warn("fetchThreads [%d] being lowered to [%d]", fetchThreads, 
maxFetchThreads);

Review Comment:
   This warning should only get logged if `configuredFetchThreads != null`. 
There's no reason to log it if `runtimeInfo.getAvailableProcessors() * 2` is 
lower than `maxFetchThreads`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to