jon-wei commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401071581
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java:
##########
@@ -81,7 +80,7 @@ public KinesisIndexTaskTuningConfig(
Long handoffConditionTimeout,
Boolean resetOffsetAutomatically,
Boolean skipSequenceNumberAvailabilityCheck,
- Integer recordBufferSize,
+ @Nullable Integer recordBufferSizeBytes,
Review Comment:
Do you think it'd make sense to log a warning if the eliminated property is
provided?
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +174,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
}
@VisibleForTesting
- static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer
configuredFetchThreads)
+ static int computeFetchThreads(
+ final RuntimeInfo runtimeInfo,
+ final Integer configuredFetchThreads
+ )
{
- final int fetchThreads;
+ int fetchThreads;
if (configuredFetchThreads != null) {
fetchThreads = configuredFetchThreads;
} else {
fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
}
+ // Each fetchThread can return upto 10MB at a time
+ //
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html),
cap fetchThreads so that
+ // we don't exceed more than the least of 100MB or 5% of heap at a time.
Don't fail if fetchThreads specified
+ // is greater than this as to not cause failure for older configurations,
but log warning in this case, and lower
+ // fetchThreads implicitly.
+ final long memoryToUse = Math.min(
+ KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+ (long) (runtimeInfo.getMaxHeapSizeBytes() *
KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+ );
+ int maxFetchThreads = Math.max(
+ 1,
+ (int) (memoryToUse / 10_000_000L)
Review Comment:
nit: maybe use a constant for the 10MB limit with a comment that explains
the limit comes from the Kinesis library
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -437,42 +429,35 @@ public KinesisRecordSupplier(
{
Preconditions.checkNotNull(amazonKinesis);
this.kinesis = amazonKinesis;
- this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis;
- this.deaggregate = deaggregate;
this.recordBufferOfferTimeout = recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait;
this.maxRecordsPerPoll = maxRecordsPerPoll;
this.fetchThreads = fetchThreads;
- this.recordBufferSize = recordBufferSize;
+ this.recordBufferSizeBytes = recordBufferSizeBytes;
this.useEarliestSequenceNumber = useEarliestSequenceNumber;
this.useListShards = useListShards;
this.backgroundFetchEnabled = fetchThreads > 0;
// the deaggregate function is implemented by the amazon-kinesis-client,
whose license is not compatible with Apache.
// The work around here is to use reflection to find the deaggregate
function in the classpath. See details on the
// docs page for more information on how to use deaggregation
- if (deaggregate) {
- try {
- Class<?> kclUserRecordclass =
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
- MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+ try {
+ Class<?> kclUserRecordclass =
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
Review Comment:
Are the points about the licensing above still correct? Looks like
amazon-kinesis-client is Apache licensed now:
https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE.txt
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,22 @@ private void
filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
}
// filter records in buffer and only retain ones whose partition was not
seeked
- BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ
= new LinkedBlockingQueue<>(recordBufferSize);
+ MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String,
ByteEntity>> newQ =
+ new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
records.stream()
- .filter(x -> !partitions.contains(x.getStreamPartition()))
- .forEachOrdered(newQ::offer);
+ .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+ .forEachOrdered(x -> {
+ if (!newQ.offer(x)) {
Review Comment:
Is this a new failure mode? What would've happened in the old code if the
queue size was exceeded?
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
// If the buffer was full and we weren't able to add the message,
grab a new stream iterator starting
// from this message and back off for a bit to let the buffer
drain before retrying.
- if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
+ recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+ while (!records.offer(
+ new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+ recordBufferOfferWaitMillis,
+ TimeUnit.MILLISECONDS
+ )) {
log.warn(
"Kinesis records are being processed slower than they are
fetched. "
+ "OrderedPartitionableRecord buffer full, storing iterator
and retrying in [%,dms].",
recordBufferFullWait
);
-
- shardIterator = kinesis.getShardIterator(
- currRecord.getStream(),
- currRecord.getPartitionId(),
- ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
- currRecord.getSequenceNumber()
- ).getShardIterator();
-
- scheduleBackgroundFetch(recordBufferFullWait);
- return;
+ recordBufferOfferWaitMillis = recordBufferFullWait;
Review Comment:
How come the shardIterator doesn't need to be reset here as before?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]