gianm commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1409823324
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -78,6 +82,14 @@ public KinesisIndexTask(
);
this.useListShards = useListShards;
this.awsCredentialsConfig = awsCredentialsConfig;
+ if (tuningConfig.getRecordBufferSizeConfigured() != null) {
Review Comment:
Please move these two checks to `run` rather than the constructor, because
we don't need to log this stuff every time a task object is constructed. (That
happens at various points on the Overlord due to various API calls and internal
machinations, and will create a log of log spam.)
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java:
##########
@@ -71,12 +69,10 @@ public KinesisSupervisorIOConfig(
@JsonProperty("lateMessageRejectionPeriod") Period
lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period
earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime
lateMessageRejectionStartDateTime,
- @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
- @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig
autoScalerConfig,
- @JsonProperty("deaggregate") boolean deaggregate
Review Comment:
The `recordsPerFetch` and `deaggregate` properties should stay here for
better compatibility during rolling updates and rollbacks. (We don't want to
lose track of them prior to a potential rollback.)
So let's instead mark them deprecated, but keep them.
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +186,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
}
@VisibleForTesting
- static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer
configuredFetchThreads)
+ static int computeFetchThreads(
+ final RuntimeInfo runtimeInfo,
+ final Integer configuredFetchThreads
+ )
{
- final int fetchThreads;
+ int fetchThreads;
if (configuredFetchThreads != null) {
fetchThreads = configuredFetchThreads;
} else {
fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
}
+ // Each fetchThread can return upto 10MB at a time
+ //
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html),
cap fetchThreads so that
+ // we don't exceed more than the least of 100MB or 5% of heap at a time.
Don't fail if fetchThreads specified
+ // is greater than this as to not cause failure for older configurations,
but log warning in this case, and lower
+ // fetchThreads implicitly.
+ final long memoryToUse = Math.min(
+ KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+ (long) (runtimeInfo.getMaxHeapSizeBytes() *
KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+ );
+ int maxFetchThreads = Math.max(
+ 1,
+ (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
+ );
+ if (fetchThreads > maxFetchThreads) {
+ log.warn("fetchThreads [%d] being lowered to [%d]", fetchThreads,
maxFetchThreads);
Review Comment:
This warning should only get logged if `configuredFetchThreads != null`.
There's no reason to log it if `runtimeInfo.getAvailableProcessors() * 2` is
lower than `maxFetchThreads`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]