jon-wei commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401071581


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java:
##########
@@ -81,7 +80,7 @@ public KinesisIndexTaskTuningConfig(
       Long handoffConditionTimeout,
       Boolean resetOffsetAutomatically,
       Boolean skipSequenceNumberAvailabilityCheck,
-      Integer recordBufferSize,
+      @Nullable Integer recordBufferSizeBytes,

Review Comment:
   Do you think it'd make sense to log a warning if the eliminated property is 
provided?



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +174,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer 
configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // 
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), 
cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. 
Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, 
but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * 
KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / 10_000_000L)

Review Comment:
   nit: maybe use a constant for the 10MB limit with a comment that explains 
the limit comes from the Kinesis library



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -437,42 +429,35 @@ public KinesisRecordSupplier(
   {
     Preconditions.checkNotNull(amazonKinesis);
     this.kinesis = amazonKinesis;
-    this.recordsPerFetch = recordsPerFetch;
     this.fetchDelayMillis = fetchDelayMillis;
-    this.deaggregate = deaggregate;
     this.recordBufferOfferTimeout = recordBufferOfferTimeout;
     this.recordBufferFullWait = recordBufferFullWait;
     this.maxRecordsPerPoll = maxRecordsPerPoll;
     this.fetchThreads = fetchThreads;
-    this.recordBufferSize = recordBufferSize;
+    this.recordBufferSizeBytes = recordBufferSizeBytes;
     this.useEarliestSequenceNumber = useEarliestSequenceNumber;
     this.useListShards = useListShards;
     this.backgroundFetchEnabled = fetchThreads > 0;
 
     // the deaggregate function is implemented by the amazon-kinesis-client, 
whose license is not compatible with Apache.
     // The work around here is to use reflection to find the deaggregate 
function in the classpath. See details on the
     // docs page for more information on how to use deaggregation
-    if (deaggregate) {
-      try {
-        Class<?> kclUserRecordclass = 
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
-        MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+    try {
+      Class<?> kclUserRecordclass = 
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");

Review Comment:
   Are the points about the licensing above still correct? Looks like 
amazon-kinesis-client is Apache licensed now: 
https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE.txt



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,22 @@ private void 
filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
     }
 
     // filter records in buffer and only retain ones whose partition was not 
seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ 
= new LinkedBlockingQueue<>(recordBufferSize);
+    MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, 
ByteEntity>> newQ =
+        new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
 
     records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+        .forEachOrdered(x -> {
+          if (!newQ.offer(x)) {

Review Comment:
   Is this a new failure mode? What would've happened in the old code if the 
queue size was exceeded?



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, 
grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer 
drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are 
fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator 
and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   How come the shardIterator doesn't need to be reset here as before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to