This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d4e8053a4a Kinesis adaptive memory management (#15360)
9d4e8053a4a is described below

commit 9d4e8053a4a84e6d106afdf8b27e62d2f396ab02
Author: zachjsh <[email protected]>
AuthorDate: Fri Jan 19 14:30:21 2024 -0500

    Kinesis adaptive memory management (#15360)
    
    ### Description
    
    Our Kinesis consumer works by using the [GetRecords 
API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)
 in some number of `fetchThreads`, each fetching some number of records 
(`recordsPerFetch`) and each inserting into a shared buffer that can hold a 
`recordBufferSize` number of records. The logic is described in our 
documentation at: 
https://druid.apache.org/docs/27.0.0/development/extensions-core/kinesis-ingestion/#determine-fetch-settings
    
    There is a problem with the logic that this pr fixes: the memory limits 
rely on a hard-coded “estimated record size” that is `10 KB` if `deaggregate: 
false` and `1 MB` if `deaggregate: true`. There have been cases where a 
supervisor had `deaggregate: true` set even though it wasn’t needed, leading to 
under-utilization of memory and poor ingestion performance.
    
    Users don’t always know if their records are aggregated or not. Also, even 
if they could figure it out, it’s better to not have to. So we’d like to 
eliminate the `deaggregate` parameter, which means we need to do memory 
management more adaptively based on the actual record sizes.
    
    We take advantage of the fact that GetRecords doesn’t return more than 10MB 
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html ):
    
    This pr:
    
    eliminates `recordsPerFetch`, always use the max limit of 10000 records 
(the default limit if not set)
    
    eliminate `deaggregate`, always have it true
    
    cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) 
then we don't exceed our budget (`100MB` or `5% of heap`). In practice this 
means `fetchThreads` will never be more than `10`. Tasks usually don't have 
that many processors available to them anyway, so in practice I don't think 
this will change the number of threads for too many deployments
    
    add `recordBufferSizeBytes` as a bytes-based limit rather than 
records-based limit for the shared queue. We do know the byte size of kinesis 
records by at this point. Default should be `100MB` or `10% of heap`, whichever 
is smaller.
    
    add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from 
shared buffer at a time. Default is `1000000` bytes.
    
    deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning 
is logged if `recordBufferSize` is specified
    
    deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is 
logged if maxRecordsPerPoll` is specified
    
    Fixed issue that when the record buffer is full, the fetchRecords logic 
throws away the rest of the GetRecords result after `recordBufferOfferTimeout` 
and starts a new shard iterator. This seems excessively churny. Instead,  wait 
an unbounded amount of time for queue to stop being full. If the queue remains 
full, we’ll end up right back waiting for it after the restarted fetch.
    
    There was also a call to `newQ::offer` without check in 
`filterBufferAndResetBackgroundFetch`, which seemed like it could cause data 
loss. Now checking return value here, and failing if false.
    
    ### Release Note
    
    Kinesis ingestion memory tuning config has been greatly simplified, and a 
more adaptive approach is now taken for the configuration. Here is a summary of 
the changes made:
    
    eliminates `recordsPerFetch`, always use the max limit of 10000 records 
(the default limit if not set)
    
    eliminate `deaggregate`, always have it true
    
    cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`) 
then we don't exceed our budget (`100MB` or `5% of heap`). In practice this 
means `fetchThreads` will never be more than `10`. Tasks usually don't have 
that many processors available to them anyway, so in practice I don't think 
this will change the number of threads for too many deployments
    
    add `recordBufferSizeBytes` as a bytes-based limit rather than 
records-based limit for the shared queue. We do know the byte size of kinesis 
records by at this point. Default should be `100MB` or `10% of heap`, whichever 
is smaller.
    
    add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from 
shared buffer at a time. Default is `1000000` bytes.
    
    deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning 
is logged if `recordBufferSize` is specified
    
    deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is 
logged if maxRecordsPerPoll` is specified
---
 .../extensions-core/kinesis-ingestion.md           |  27 +-
 .../apache/druid/emitter/kafka/KafkaEmitter.java   |   6 +-
 .../kafka/MemoryBoundLinkedBlockingQueue.java      |  85 ------
 .../druid/indexing/kinesis/KinesisIndexTask.java   |  69 +++--
 .../indexing/kinesis/KinesisIndexTaskIOConfig.java |  56 +---
 .../kinesis/KinesisIndexTaskTuningConfig.java      |  63 ++--
 .../indexing/kinesis/KinesisRecordSupplier.java    | 168 ++++++-----
 .../druid/indexing/kinesis/KinesisSamplerSpec.java |   6 +-
 .../kinesis/supervisor/KinesisSupervisor.java      |  10 +-
 .../supervisor/KinesisSupervisorIOConfig.java      |   4 +-
 .../supervisor/KinesisSupervisorTuningConfig.java  |  15 +-
 .../indexing/kinesis/KinesisIOConfigTest.java      |  42 +--
 .../kinesis/KinesisIndexTaskSerdeTest.java         |   6 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  33 +--
 .../kinesis/KinesisIndexTaskTuningConfigTest.java  |  24 +-
 .../kinesis/KinesisRecordSupplierTest.java         | 212 ++++----------
 .../supervisor/KinesisSupervisorIOConfigTest.java  |   8 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  | 106 ++++---
 .../TestModifiedKinesisIndexTaskTuningConfig.java  |   6 +
 .../common/MemoryBoundLinkedBlockingQueue.java     | 211 ++++++++++++++
 .../common/MemoryBoundLinkedBlockingQueueTest.java | 319 +++++++++++++++++++++
 .../druid-models/ingestion-spec/ingestion-spec.tsx |  30 +-
 website/.spelling                                  |   1 +
 23 files changed, 907 insertions(+), 600 deletions(-)

diff --git a/docs/development/extensions-core/kinesis-ingestion.md 
b/docs/development/extensions-core/kinesis-ingestion.md
index 5071b153366..7d0709d99cf 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -241,11 +241,9 @@ The following table outlines the configuration options for 
`ioConfig`:
 |`completionTimeout`|ISO 8601 period|The length of time to wait before Druid 
declares a publishing task has failed and terminates it. If this is set too 
low, your tasks may never publish. The publishing clock for a task begins 
roughly after `taskDuration` elapses.|No|PT6H|
 |`lateMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject 
messages with timestamps earlier than this period before the task is created. 
For example, if `lateMessageRejectionPeriod` is set to `PT1H` and the 
supervisor creates a task at `2016-01-01T12:00Z`, messages with timestamps 
earlier than `2016-01-01T11:00Z` are dropped. This may help prevent concurrency 
issues if your data stream has late messages and you have multiple pipelines 
that need to operate on the same segment [...]
 |`earlyMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
`taskDuration`. For example, if `earlyMessageRejectionPeriod` is set to `PT1H`, 
the `taskDuration` is set to `PT1H` and the supervisor creates a task at 
`2016-01-01T12:00Z`. Messages with timestamps later than `2016-01-01T14:00Z` 
are dropped. **Note:** Tasks sometimes run past their task duration, for 
example, in cases of supervisor failover. [...]
-|`recordsPerFetch`|Integer|The number of records to request per call to fetch 
records from Kinesis.|No| See [Determine fetch 
settings](#determine-fetch-settings) for defaults.|
 |`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent 
calls to fetch records from Kinesis. See [Determine fetch 
settings](#determine-fetch-settings).|No|0|
 |`awsAssumedRoleArn`|String|The AWS assumed role to use for additional 
permissions.|No||
 |`awsExternalId`|String|The AWS external ID to use for additional 
permissions.|No||
-|`deaggregate`|Boolean|Whether to use the deaggregate function of the Kinesis 
Client Library (KCL).|No||
 |`autoScalerConfig`|Object|Defines autoscaling behavior for Kinesis ingest 
tasks. See [Task autoscaler properties](#task-autoscaler-properties) for more 
information.|No|null|
 
 ### Task autoscaler properties
@@ -406,7 +404,7 @@ The following table outlines the configuration options for 
`tuningConfig`:
 |`chatRetries`|Integer|The number of times Druid retries HTTP requests to 
indexing tasks before considering tasks unresponsive.|No|8|
 |`httpTimeout`|ISO 8601 period|The period of time to wait for a HTTP response 
from an indexing task.|No|PT10S|
 |`shutdownTimeout`|ISO 8601 period|The period of time to wait for the 
supervisor to attempt a graceful shutdown of tasks before exiting.|No|PT80S|
-|`recordBufferSize`|Integer|The size of the buffer (number of events) Druid 
uses between the Kinesis fetch threads and the main ingestion thread.|No|See 
[Determine fetch settings](#determine-fetch-settings) for defaults.|
+|`recordBufferSizeBytes`|Integer| The size of the buffer (heap memory bytes) 
Druid uses between the Kinesis fetch threads and the main ingestion thread.|No| 
See [Determine fetch settings](#determine-fetch-settings) for defaults.|
 |`recordBufferOfferTimeout`|Integer|The number of milliseconds to wait for 
space to become available in the buffer before timing out.|No|5000|
 |`recordBufferFullWait`|Integer|The number of milliseconds to wait for the 
buffer to drain before Druid attempts to fetch records from Kinesis 
again.|No|5000|
 |`fetchThreads`|Integer|The size of the pool of threads fetching data from 
Kinesis. There is no benefit in having more threads than Kinesis shards.|No| 
`procs * 2`, where `procs` is the number of processors available to the task.|
@@ -414,7 +412,7 @@ The following table outlines the configuration options for 
`tuningConfig`:
 |`logParseExceptions`|Boolean|If `true`, Druid logs an error message when a 
parsing exception occurs, containing information about the row where the error 
occurred.|No|`false`|
 |`maxParseExceptions`|Integer|The maximum number of parse exceptions that can 
occur before the task halts ingestion and fails. Overridden if 
`reportParseExceptions` is set.|No|unlimited|
 |`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid keeps 
track of the most recent parse exceptions. `maxSavedParseExceptions` limits the 
number of saved exception instances. These saved exceptions are available after 
the task finishes in the [task completion 
report](../../ingestion/tasks.md#task-reports). Overridden if 
`reportParseExceptions` is set.|No|0|
-|`maxRecordsPerPoll`|Integer|The maximum number of records to be fetched from 
buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, 
Max(bufferSize, 1))`.|No| See [Determine fetch 
settings](#determine-fetch-settings) for defaults.|
+|`maxBytesPerPoll`|Integer| The maximum number of bytes to be fetched from 
buffer per poll. At least one record is polled from the buffer regardless of 
this config.|No| 1000000 bytes|
 |`repartitionTransitionDuration`|ISO 8601 period|When shards are split or 
merged, the supervisor recomputes shard to task group mappings. The supervisor 
also signals any running tasks created under the old mappings to stop early at 
current time + `repartitionTransitionDuration`. Stopping the tasks early allows 
Druid to begin reading from the new shards more quickly. The repartition 
transition wait time controlled by this property gives the stream additional 
time to write records to the n [...]
 |`offsetFetchPeriod`|ISO 8601 period|Determines how often the supervisor 
queries Kinesis and the indexing tasks to fetch current offsets and calculate 
lag. If the user-specified value is below the minimum value of PT5S, the 
supervisor ignores the value and uses the minimum value instead.|No|PT30S|
 |`useListShards`|Boolean|Indicates if `listShards` API of AWS Kinesis SDK can 
be used to prevent `LimitExceededException` during ingestion. You must set the 
necessary `IAM` permissions.|No|`false`|
@@ -656,25 +654,22 @@ For more detail, see [Segment size 
optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess 
threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a 
Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, 
with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size 
`recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size 
`recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records 
from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when 
[`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual 
records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The 
number of processors available to the task
 is the total number of processors on the server, divided by 
`druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data 
record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB 
of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a 
warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is 
smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 
1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever 
is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 
1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated 
records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, 
whichever is smaller.
+- `maxBytesPerPoll`: 1000000.
 
 Kinesis places the following restrictions on calls to fetch records:
 
@@ -697,8 +692,6 @@ Kinesis stream.
 The Kinesis indexing service supports de-aggregation of multiple rows packed 
into a single record by the Kinesis
 Producer Library's aggregate method for more efficient data transfer.
 
-To enable this feature, set `deaggregate` to true in your `ioConfig` when 
submitting a supervisor spec.
-
 ## Resharding
 
 
[Resharding](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html)
 is an advanced operation that lets you adjust the number of shards in a stream 
to adapt to changes in the rate of data flowing through a stream.
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 87183d62fe7..661543f707c 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
-import 
org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
+import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -176,7 +176,7 @@ public class KafkaEmitter implements Emitter
 
   private void sendToKafka(final String topic, 
MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
-    ObjectContainer<String> objectToSend;
+    MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectToSend;
     try {
       while (true) {
         objectToSend = recordQueue.take();
@@ -206,7 +206,7 @@ public class KafkaEmitter implements Emitter
 
         String resultJson = jsonMapper.writeValueAsString(map);
 
-        ObjectContainer<String> objectContainer = new ObjectContainer<>(
+        MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectContainer 
= new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(
             resultJson,
             StringUtils.toUtf8(resultJson).length
         );
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java
deleted file mode 100644
index fb6cae8ee95..00000000000
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.emitter.kafka;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Similar to LinkedBlockingQueue but can be bounded by the total byte size of 
the items present in the queue
- * rather than number of items.
- */
-public class MemoryBoundLinkedBlockingQueue<T>
-{
-  private final long memoryBound;
-  private final AtomicLong currentMemory;
-  private final LinkedBlockingQueue<ObjectContainer<T>> queue;
-
-  public MemoryBoundLinkedBlockingQueue(long memoryBound)
-  {
-    this.memoryBound = memoryBound;
-    this.currentMemory = new AtomicLong(0L);
-    this.queue = new LinkedBlockingQueue<>();
-  }
-
-  // returns true/false depending on whether item was added or not
-  public boolean offer(ObjectContainer<T> item)
-  {
-    final long itemLength = item.getSize();
-
-    if (currentMemory.addAndGet(itemLength) <= memoryBound) {
-      if (queue.offer(item)) {
-        return true;
-      }
-    }
-    currentMemory.addAndGet(-itemLength);
-    return false;
-  }
-
-  // blocks until at least one item is available to take
-  public ObjectContainer<T> take() throws InterruptedException
-  {
-    final ObjectContainer<T> ret = queue.take();
-    currentMemory.addAndGet(-ret.getSize());
-    return ret;
-  }
-
-  public static class ObjectContainer<T>
-  {
-    private T data;
-    private long size;
-
-    ObjectContainer(T data, long size)
-    {
-      this.data = data;
-      this.size = size;
-    }
-
-    public T getData()
-    {
-      return data;
-    }
-
-    public long getSize()
-    {
-      return size;
-    }
-  }
-}
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 1aa6d5b2e3b..fb019f10030 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -49,6 +49,10 @@ import java.util.Set;
 public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, 
ByteEntity>
 {
   private static final String TYPE = "index_kinesis";
+
+  // GetRecords returns maximum 10MB per call
+  // 
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
+  private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L;
   private static final Logger log = new Logger(KinesisIndexTask.class);
 
   private final boolean useListShards;
@@ -84,6 +88,14 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, By
   public TaskStatus runTask(TaskToolbox toolbox)
   {
     this.runtimeInfo = toolbox.getAdjustedRuntimeInfo();
+    if (getTuningConfig().getRecordBufferSizeConfigured() != null) {
+      log.warn("The 'recordBufferSize' config property of the kinesis tuning 
config has been deprecated. "
+               + "Please use 'recordBufferSizeBytes'.");
+    }
+    if (getTuningConfig().getMaxRecordsPerPollConfigured() != null) {
+      log.warn("The 'maxRecordsPerPoll' config property of the kinesis tuning 
config has been deprecated. "
+               + "Please use 'maxBytesPerPoll'.");
+    }
     return super.runTask(toolbox);
   }
 
@@ -105,21 +117,18 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, By
   {
     KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) 
super.ioConfig);
     KinesisIndexTaskTuningConfig tuningConfig = 
((KinesisIndexTaskTuningConfig) super.tuningConfig);
+    final int recordBufferSizeBytes =
+        
tuningConfig.getRecordBufferSizeBytesOrDefault(runtimeInfo.getMaxHeapSizeBytes());
     final int fetchThreads = computeFetchThreads(runtimeInfo, 
tuningConfig.getFetchThreads());
-    final int recordsPerFetch = 
ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(), 
fetchThreads);
-    final int recordBufferSize =
-        
tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(), 
ioConfig.isDeaggregate());
-    final int maxRecordsPerPoll = 
tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate());
+    final int maxBytesPerPoll = tuningConfig.getMaxBytesPerPollOrDefault();
 
     log.info(
-        "Starting record supplier with fetchThreads [%d], fetchDelayMillis 
[%d], recordsPerFetch [%d], "
-        + "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].",
+        "Starting record supplier with fetchThreads [%d], fetchDelayMillis 
[%d], "
+        + "recordBufferSizeBytes [%d], maxBytesPerPoll [%d]",
         fetchThreads,
         ioConfig.getFetchDelayMillis(),
-        recordsPerFetch,
-        recordBufferSize,
-        maxRecordsPerPoll,
-        ioConfig.isDeaggregate()
+        recordBufferSizeBytes,
+        maxBytesPerPoll
     );
 
     return new KinesisRecordSupplier(
@@ -129,19 +138,24 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, By
             ioConfig.getAwsAssumedRoleArn(),
             ioConfig.getAwsExternalId()
         ),
-        recordsPerFetch,
         ioConfig.getFetchDelayMillis(),
         fetchThreads,
-        ioConfig.isDeaggregate(),
-        recordBufferSize,
+        recordBufferSizeBytes,
         tuningConfig.getRecordBufferOfferTimeout(),
         tuningConfig.getRecordBufferFullWait(),
-        maxRecordsPerPoll,
+        maxBytesPerPoll,
         false,
         useListShards
     );
   }
 
+  @Override
+  @JsonProperty
+  public KinesisIndexTaskTuningConfig getTuningConfig()
+  {
+    return (KinesisIndexTaskTuningConfig) super.getTuningConfig();
+  }
+
   @Override
   @JsonProperty("ioConfig")
   public KinesisIndexTaskIOConfig getIOConfig()
@@ -179,15 +193,38 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, By
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer 
configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // 
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), 
cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. 
Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, 
but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * 
KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
+    );
+    if (fetchThreads > maxFetchThreads) {
+      if (configuredFetchThreads != null) {
+        log.warn("fetchThreads [%d] being lowered to [%d]", 
configuredFetchThreads, maxFetchThreads);
+      }
+      fetchThreads = maxFetchThreads;
+    }
+
     Preconditions.checkArgument(
         fetchThreads > 0,
         "Must have at least one background fetch thread for the record 
supplier"
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
index 0572c317006..881d68ba896 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
 import org.apache.druid.data.input.InputFormat;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
@@ -41,21 +40,19 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
    * Together with {@link 
KinesisIndexTaskTuningConfig#MAX_RECORD_BUFFER_MEMORY}, don't take up more than 
200MB
    * per task.
    */
-  private static final int MAX_RECORD_FETCH_MEMORY = 100_000_000;
+  public static final int MAX_RECORD_FETCH_MEMORY = 100_000_000;
 
   /**
    * Together with {@link 
KinesisIndexTaskTuningConfig#RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION}, don't 
take up more
    * than 15% of the heap.
    */
-  private static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05;
+  public static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05;
 
   private final String endpoint;
-  private final Integer recordsPerFetch;
   private final int fetchDelayMillis;
 
   private final String awsAssumedRoleArn;
   private final String awsExternalId;
-  private final boolean deaggregate;
 
   @JsonCreator
   public KinesisIndexTaskIOConfig(
@@ -79,11 +76,9 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
       @JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
       @JsonProperty("endpoint") String endpoint,
-      @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
-      @JsonProperty("awsExternalId") String awsExternalId,
-      @JsonProperty("deaggregate") boolean deaggregate
+      @JsonProperty("awsExternalId") String awsExternalId
   )
   {
     super(
@@ -105,11 +100,9 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
     );
 
     this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint");
-    this.recordsPerFetch = recordsPerFetch;
     this.fetchDelayMillis = fetchDelayMillis != null ? fetchDelayMillis : 
DEFAULT_FETCH_DELAY_MILLIS;
     this.awsAssumedRoleArn = awsAssumedRoleArn;
     this.awsExternalId = awsExternalId;
-    this.deaggregate = deaggregate;
   }
 
   public KinesisIndexTaskIOConfig(
@@ -122,11 +115,9 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
       DateTime maximumMessageTime,
       InputFormat inputFormat,
       String endpoint,
-      Integer recordsPerFetch,
       Integer fetchDelayMillis,
       String awsAssumedRoleArn,
-      String awsExternalId,
-      boolean deaggregate
+      String awsExternalId
   )
   {
     this(
@@ -142,11 +133,9 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
         maximumMessageTime,
         inputFormat,
         endpoint,
-        recordsPerFetch,
         fetchDelayMillis,
         awsAssumedRoleArn,
-        awsExternalId,
-        deaggregate
+        awsExternalId
     );
   }
 
@@ -215,32 +204,6 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
     return endpoint;
   }
 
-  @Nullable
-  @JsonProperty("recordsPerFetch")
-  @JsonInclude(JsonInclude.Include.NON_NULL)
-  public Integer getRecordsPerFetchConfigured()
-  {
-    return recordsPerFetch;
-  }
-
-  public int getRecordsPerFetchOrDefault(final long maxHeapSize, final int 
fetchThreads)
-  {
-    if (recordsPerFetch != null) {
-      return recordsPerFetch;
-    } else {
-      final long memoryToUse = Math.min(
-          MAX_RECORD_FETCH_MEMORY,
-          (long) (maxHeapSize * RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
-      );
-
-      final int assumedRecordSize = deaggregate
-                                    ? 
KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE_AGGREGATE
-                                    : 
KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE;
-
-      return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize / 
fetchThreads));
-    }
-  }
-
   @JsonProperty
   @JsonInclude(JsonInclude.Include.NON_DEFAULT)
   public int getFetchDelayMillis()
@@ -262,13 +225,6 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
     return awsExternalId;
   }
 
-  @JsonProperty
-  @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-  public boolean isDeaggregate()
-  {
-    return deaggregate;
-  }
-
   @Override
   public String toString()
   {
@@ -280,11 +236,9 @@ public class KinesisIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<St
            ", minimumMessageTime=" + getMinimumMessageTime() +
            ", maximumMessageTime=" + getMaximumMessageTime() +
            ", endpoint='" + endpoint + '\'' +
-           ", recordsPerFetch=" + recordsPerFetch +
            ", fetchDelayMillis=" + fetchDelayMillis +
            ", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' +
            ", awsExternalId='" + awsExternalId + '\'' +
-           ", deaggregate=" + deaggregate +
            '}';
   }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 06d8cf53723..0c3cd717752 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -57,13 +56,14 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
   private static final int DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT = 5000;
   private static final int DEFAULT_RECORD_BUFFER_FULL_WAIT = 5000;
   private static final int DEFAULT_MAX_RECORDS_PER_POLL = 100;
-  private static final int DEFAULT_MAX_RECORDS_PER_POLL_AGGREGATE = 1;
-
+  private static final int DEFAULT_MAX_BYTES_PER_POLL = 1_000_000;
   private final Integer recordBufferSize;
+  private final Integer recordBufferSizeBytes;
   private final int recordBufferOfferTimeout;
   private final int recordBufferFullWait;
   private final Integer fetchThreads;
   private final Integer maxRecordsPerPoll;
+  private final Integer maxBytesPerPoll;
 
   public KinesisIndexTaskTuningConfig(
       @Nullable AppendableIndexSpec appendableIndexSpec,
@@ -81,7 +81,8 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
       Long handoffConditionTimeout,
       Boolean resetOffsetAutomatically,
       Boolean skipSequenceNumberAvailabilityCheck,
-      Integer recordBufferSize,
+      @Deprecated @Nullable Integer recordBufferSize,
+      @Nullable Integer recordBufferSizeBytes,
       Integer recordBufferOfferTimeout,
       Integer recordBufferFullWait,
       Integer fetchThreads,
@@ -89,7 +90,8 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
       @Nullable Boolean logParseExceptions,
       @Nullable Integer maxParseExceptions,
       @Nullable Integer maxSavedParseExceptions,
-      @Nullable Integer maxRecordsPerPoll,
+      @Deprecated @Nullable Integer maxRecordsPerPoll,
+      @Nullable Integer maxBytesPerPoll,
       @Nullable Period intermediateHandoffPeriod
   )
   {
@@ -116,12 +118,14 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         maxSavedParseExceptions
     );
     this.recordBufferSize = recordBufferSize;
+    this.recordBufferSizeBytes = recordBufferSizeBytes;
     this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
                                     ? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
                                     : recordBufferOfferTimeout;
     this.recordBufferFullWait = recordBufferFullWait == null ? 
DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
     this.fetchThreads = fetchThreads; // we handle this being null later
     this.maxRecordsPerPoll = maxRecordsPerPoll;
+    this.maxBytesPerPoll = maxBytesPerPoll;
 
     Preconditions.checkArgument(
         !(super.isResetOffsetAutomatically() && 
super.isSkipSequenceNumberAvailabilityCheck()),
@@ -145,7 +149,8 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
       @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
       @JsonProperty("resetOffsetAutomatically") Boolean 
resetOffsetAutomatically,
       @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean 
skipSequenceNumberAvailabilityCheck,
-      @JsonProperty("recordBufferSize") Integer recordBufferSize,
+      @JsonProperty("recordBufferSize") @Deprecated @Nullable Integer 
recordBufferSize,
+      @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes,
       @JsonProperty("recordBufferOfferTimeout") Integer 
recordBufferOfferTimeout,
       @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
       @JsonProperty("fetchThreads") Integer fetchThreads,
@@ -153,7 +158,8 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
       @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
       @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions,
-      @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+      @JsonProperty("maxRecordsPerPoll") @Deprecated @Nullable Integer 
maxRecordsPerPoll,
+      @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll,
       @JsonProperty("intermediateHandoffPeriod") @Nullable Period 
intermediateHandoffPeriod
   )
   {
@@ -174,6 +180,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         resetOffsetAutomatically,
         skipSequenceNumberAvailabilityCheck,
         recordBufferSize,
+        recordBufferSizeBytes,
         recordBufferOfferTimeout,
         recordBufferFullWait,
         fetchThreads,
@@ -182,6 +189,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         maxParseExceptions,
         maxSavedParseExceptions,
         maxRecordsPerPoll,
+        maxBytesPerPoll,
         intermediateHandoffPeriod
     );
   }
@@ -194,18 +202,23 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
     return recordBufferSize;
   }
 
-  public int getRecordBufferSizeOrDefault(final long maxHeapSize, final 
boolean deaggregate)
+  @Nullable
+  @JsonProperty("recordBufferSizeBytes")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Integer getRecordBufferSizeBytesConfigured()
   {
-    if (recordBufferSize != null) {
-      return recordBufferSize;
+    return recordBufferSizeBytes;
+  }
+
+  public int getRecordBufferSizeBytesOrDefault(final long maxHeapSize)
+  {
+    if (recordBufferSizeBytes != null) {
+      return recordBufferSizeBytes;
     } else {
-      final long memoryToUse = Math.min(
+      return (int) Math.min(
           MAX_RECORD_BUFFER_MEMORY,
           (long) (maxHeapSize * RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION)
       );
-
-      final int assumedRecordSize = deaggregate ? 
ASSUMED_RECORD_SIZE_AGGREGATE : ASSUMED_RECORD_SIZE;
-      return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize));
     }
   }
 
@@ -237,9 +250,17 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
     return maxRecordsPerPoll;
   }
 
-  public int getMaxRecordsPerPollOrDefault(final boolean deaggregate)
+  @Nullable
+  @JsonProperty("maxBytesPerPoll")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Integer getMaxBytesPerPollConfigured()
+  {
+    return maxBytesPerPoll;
+  }
+
+  public int getMaxBytesPerPollOrDefault()
   {
-    return deaggregate ? DEFAULT_MAX_RECORDS_PER_POLL_AGGREGATE : 
DEFAULT_MAX_RECORDS_PER_POLL;
+    return maxBytesPerPoll != null ? maxBytesPerPoll : 
DEFAULT_MAX_BYTES_PER_POLL;
   }
 
   @Override
@@ -262,6 +283,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         isResetOffsetAutomatically(),
         isSkipSequenceNumberAvailabilityCheck(),
         getRecordBufferSizeConfigured(),
+        getRecordBufferSizeBytesConfigured(),
         getRecordBufferOfferTimeout(),
         getRecordBufferFullWait(),
         getFetchThreads(),
@@ -270,6 +292,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         getMaxParseExceptions(),
         getMaxSavedParseExceptions(),
         getMaxRecordsPerPollConfigured(),
+        getMaxBytesPerPollConfigured(),
         getIntermediateHandoffPeriod()
     );
   }
@@ -288,9 +311,11 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
     }
     KinesisIndexTaskTuningConfig that = (KinesisIndexTaskTuningConfig) o;
     return Objects.equals(recordBufferSize, that.recordBufferSize) &&
+           Objects.equals(recordBufferSizeBytes, that.recordBufferSizeBytes) &&
            recordBufferOfferTimeout == that.recordBufferOfferTimeout &&
            recordBufferFullWait == that.recordBufferFullWait &&
            Objects.equals(maxRecordsPerPoll, that.maxRecordsPerPoll) &&
+           Objects.equals(maxBytesPerPoll, that.maxBytesPerPoll) &&
            Objects.equals(fetchThreads, that.fetchThreads);
   }
 
@@ -300,10 +325,12 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
     return Objects.hash(
         super.hashCode(),
         recordBufferSize,
+        recordBufferSizeBytes,
         recordBufferOfferTimeout,
         recordBufferFullWait,
         fetchThreads,
-        maxRecordsPerPoll
+        maxRecordsPerPoll,
+        maxBytesPerPoll
     );
   }
 
@@ -324,6 +351,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
            ", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
            ", skipSequenceNumberAvailabilityCheck=" + 
isSkipSequenceNumberAvailabilityCheck() +
            ", recordBufferSize=" + recordBufferSize +
+           ", recordBufferSizeBytes=" + recordBufferSizeBytes +
            ", recordBufferOfferTimeout=" + recordBufferOfferTimeout +
            ", recordBufferFullWait=" + recordBufferFullWait +
            ", fetchThreads=" + fetchThreads +
@@ -332,6 +360,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
            ", maxParseExceptions=" + getMaxParseExceptions() +
            ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
            ", maxRecordsPerPoll=" + maxRecordsPerPoll +
+           ", maxBytesPerPoll=" + maxBytesPerPoll +
            ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
            '}';
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index f0645f8f82c..36047ce429d 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -46,11 +46,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import org.apache.druid.common.aws.AWSClientUtil;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.common.aws.AWSCredentialsUtils;
 import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -58,6 +58,7 @@ import 
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -78,12 +79,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -212,7 +211,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
         // used for retrying on InterruptedException
         GetRecordsResult recordsResult = null;
         OrderedPartitionableRecord<String, String, ByteEntity> currRecord;
-
+        long recordBufferOfferWaitMillis;
         try {
 
           if (shardIterator == null) {
@@ -228,45 +227,47 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
 
             recordsResult = null;
 
-            if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, 0),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn("Kinesis records are being processed slower than they 
are fetched. "
                        + "OrderedPartitionableRecord buffer full, retrying in 
[%,dms].",
                        recordBufferFullWait);
-              scheduleBackgroundFetch(recordBufferFullWait);
+              recordBufferOfferWaitMillis = recordBufferFullWait;
             }
 
             return;
           }
 
-          recordsResult = kinesis.getRecords(new 
GetRecordsRequest().withShardIterator(
-              shardIterator).withLimit(recordsPerFetch));
+          recordsResult = kinesis.getRecords(new 
GetRecordsRequest().withShardIterator(shardIterator));
 
           currentLagMillis = recordsResult.getMillisBehindLatest();
 
           // list will come back empty if there are no records
           for (Record kinesisRecord : recordsResult.getRecords()) {
-
             final List<ByteEntity> data;
 
+            if (deaggregateHandle == null || getDataHandle == null) {
+              throw new ISE("deaggregateHandle or getDataHandle is null!");
+            }
 
-            if (deaggregate) {
-              if (deaggregateHandle == null || getDataHandle == null) {
-                throw new ISE("deaggregateHandle or getDataHandle is null!");
-              }
-
-              data = new ArrayList<>();
+            data = new ArrayList<>();
 
-              final List userRecords = (List) deaggregateHandle.invokeExact(
-                  Collections.singletonList(kinesisRecord)
-              );
+            final List userRecords = (List) deaggregateHandle.invokeExact(
+                Collections.singletonList(kinesisRecord)
+            );
 
-              for (Object userRecord : userRecords) {
-                data.add(new ByteEntity((ByteBuffer) 
getDataHandle.invoke(userRecord)));
-              }
-            } else {
-              data = Collections.singletonList(new 
ByteEntity(kinesisRecord.getData()));
+            int recordSize = 0;
+            for (Object userRecord : userRecords) {
+              ByteEntity byteEntity = new ByteEntity((ByteBuffer) 
getDataHandle.invoke(userRecord));
+              recordSize += byteEntity.getBuffer().array().length;
+              data.add(byteEntity);
             }
 
+
             currRecord = new OrderedPartitionableRecord<>(
                 streamPartition.getStream(),
                 streamPartition.getPartitionId(),
@@ -277,10 +278,11 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
 
             if (log.isTraceEnabled()) {
               log.trace(
-                  "Stream[%s] / partition[%s] / sequenceNum[%s] / 
bufferRemainingCapacity[%d]: %s",
+                  "Stream[%s] / partition[%s] / sequenceNum[%s] / 
bufferByteSize[%d] / bufferRemainingByteCapacity[%d]: %s",
                   currRecord.getStream(),
                   currRecord.getPartitionId(),
                   currRecord.getSequenceNumber(),
+                  records.byteSize(),
                   records.remainingCapacity(),
                   currRecord.getData()
                             .stream()
@@ -292,24 +294,18 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
               );
             }
 
-            // If the buffer was full and we weren't able to add the message, 
grab a new stream iterator starting
-            // from this message and back off for a bit to let the buffer 
drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are 
fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator 
and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;
             }
           }
 
@@ -399,15 +395,12 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
   private final MethodHandle getDataHandle;
 
   private final AmazonKinesis kinesis;
-
-  private final int recordsPerFetch;
   private final int fetchDelayMillis;
-  private final boolean deaggregate;
   private final int recordBufferOfferTimeout;
   private final int recordBufferFullWait;
-  private final int maxRecordsPerPoll;
+  private final int maxBytesPerPoll;
   private final int fetchThreads;
-  private final int recordBufferSize;
+  private final int recordBufferSizeBytes;
   private final boolean useEarliestSequenceNumber;
   private final boolean useListShards;
 
@@ -415,7 +408,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
 
   private final ConcurrentMap<StreamPartition<String>, PartitionResource> 
partitionResources =
       new ConcurrentHashMap<>();
-  private BlockingQueue<OrderedPartitionableRecord<String, String, 
ByteEntity>> records;
+  private MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, 
String, ByteEntity>> records;
 
   private final boolean backgroundFetchEnabled;
   private volatile boolean closed = false;
@@ -423,56 +416,48 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
 
   public KinesisRecordSupplier(
       AmazonKinesis amazonKinesis,
-      int recordsPerFetch,
       int fetchDelayMillis,
       int fetchThreads,
-      boolean deaggregate,
-      int recordBufferSize,
+      int recordBufferSizeBytes,
       int recordBufferOfferTimeout,
       int recordBufferFullWait,
-      int maxRecordsPerPoll,
+      int maxBytesPerPoll,
       boolean useEarliestSequenceNumber,
       boolean useListShards
   )
   {
     Preconditions.checkNotNull(amazonKinesis);
     this.kinesis = amazonKinesis;
-    this.recordsPerFetch = recordsPerFetch;
     this.fetchDelayMillis = fetchDelayMillis;
-    this.deaggregate = deaggregate;
     this.recordBufferOfferTimeout = recordBufferOfferTimeout;
     this.recordBufferFullWait = recordBufferFullWait;
-    this.maxRecordsPerPoll = maxRecordsPerPoll;
+    this.maxBytesPerPoll = maxBytesPerPoll;
     this.fetchThreads = fetchThreads;
-    this.recordBufferSize = recordBufferSize;
+    this.recordBufferSizeBytes = recordBufferSizeBytes;
     this.useEarliestSequenceNumber = useEarliestSequenceNumber;
     this.useListShards = useListShards;
     this.backgroundFetchEnabled = fetchThreads > 0;
 
-    // the deaggregate function is implemented by the amazon-kinesis-client, 
whose license is not compatible with Apache.
-    // The work around here is to use reflection to find the deaggregate 
function in the classpath. See details on the
-    // docs page for more information on how to use deaggregation
-    if (deaggregate) {
-      try {
-        Class<?> kclUserRecordclass = 
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
-        MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+    // The deaggregate function is implemented by the amazon-kinesis-client, 
whose license was formerly not compatible
+    // with Apache. The code here avoids the license issue by using 
reflection, but is no longer necessary since
+    // amazon-kinesis-client is now Apache-licensed and is now a dependency of 
Druid. This code could safely be
+    // modified to use regular calls rather than reflection.
+    try {
+      Class<?> kclUserRecordclass = 
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
+      MethodHandles.Lookup lookup = MethodHandles.publicLookup();
 
-        Method deaggregateMethod = kclUserRecordclass.getMethod("deaggregate", 
List.class);
-        Method getDataMethod = kclUserRecordclass.getMethod("getData");
+      Method deaggregateMethod = kclUserRecordclass.getMethod("deaggregate", 
List.class);
+      Method getDataMethod = kclUserRecordclass.getMethod("getData");
 
-        deaggregateHandle = lookup.unreflect(deaggregateMethod);
-        getDataHandle = lookup.unreflect(getDataMethod);
-      }
-      catch (ClassNotFoundException e) {
-        throw new ISE(e, "cannot find 
class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], "
-                         + "note that when using deaggregate=true, you must 
provide the Kinesis Client Library jar in the classpath");
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      deaggregateHandle = null;
-      getDataHandle = null;
+      deaggregateHandle = lookup.unreflect(deaggregateMethod);
+      getDataHandle = lookup.unreflect(getDataMethod);
+    }
+    catch (ClassNotFoundException e) {
+      throw new ISE(e, "cannot find 
class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], "
+                       + "note that when using deaggregate=true, you must 
provide the Kinesis Client Library jar in the classpath");
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
     }
 
     if (backgroundFetchEnabled) {
@@ -488,7 +473,7 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
       );
     }
 
-    records = new LinkedBlockingQueue<>(recordBufferSize);
+    records = new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
   }
 
   public static AmazonKinesis getAmazonKinesisClient(
@@ -635,23 +620,19 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
     start();
 
     try {
-      int expectedSize = Math.min(Math.max(records.size(), 1), 
maxRecordsPerPoll);
-
-      List<OrderedPartitionableRecord<String, String, ByteEntity>> 
polledRecords = new ArrayList<>(expectedSize);
+      
List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String,
 String, ByteEntity>>> polledRecords = new ArrayList<>();
 
-      Queues.drain(
-          records,
+      records.drain(
           polledRecords,
-          expectedSize,
+          maxBytesPerPoll,
           timeout,
           TimeUnit.MILLISECONDS
       );
 
-      polledRecords = polledRecords.stream()
-                                   .filter(x -> 
partitionResources.containsKey(x.getStreamPartition()))
-                                   .collect(Collectors.toList());
-
-      return polledRecords;
+      return polledRecords.stream()
+          .filter(x -> 
partitionResources.containsKey(x.getData().getStreamPartition()))
+          .map(MemoryBoundLinkedBlockingQueue.ObjectContainer::getData)
+          .collect(Collectors.toList());
     }
     catch (InterruptedException e) {
       log.warn(e, "Interrupted while polling");
@@ -1059,11 +1040,22 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
     }
 
     // filter records in buffer and only retain ones whose partition was not 
seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ 
= new LinkedBlockingQueue<>(recordBufferSize);
+    MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, 
ByteEntity>> newQ =
+        new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
 
     records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+        .forEachOrdered(x -> {
+          if (!newQ.offer(x)) {
+            // this should never really happen in practice but adding check 
here for safety.
+            throw DruidException.defensive("Failed to insert item to new queue 
when resetting background fetch. "
+                + "[stream: '%s', partitionId: '%s', sequenceNumber: '%s']",
+                x.getData().getStream(),
+                x.getData().getPartitionId(),
+                x.getData().getSequenceNumber()
+            );
+          }
+        });
 
     records = newQ;
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
index 29e909eb0bb..81f8b774f04 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -73,14 +73,12 @@ public class KinesisSamplerSpec extends 
SeekableStreamSamplerSpec
             ioConfig.getAwsAssumedRoleArn(),
             ioConfig.getAwsExternalId()
         ),
-        ioConfig.getRecordsPerFetch() != null ? ioConfig.getRecordsPerFetch() 
: DEFAULT_RECORDS_PER_FETCH,
         ioConfig.getFetchDelayMillis(),
         1,
-        ioConfig.isDeaggregate(),
-        
tuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(), 
ioConfig.isDeaggregate()),
+        
tuningConfig.getRecordBufferSizeBytesOrDefault(Runtime.getRuntime().maxMemory()),
         tuningConfig.getRecordBufferOfferTimeout(),
         tuningConfig.getRecordBufferFullWait(),
-        tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()),
+        tuningConfig.getMaxBytesPerPollOrDefault(),
         ioConfig.isUseEarliestSequenceNumber(),
         tuningConfig.isUseListShards()
     );
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index accd8316f66..a142f414762 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -143,11 +143,9 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
         maximumMessageTime,
         ioConfig.getInputFormat(),
         ioConfig.getEndpoint(),
-        ioConfig.getRecordsPerFetch(),
         ioConfig.getFetchDelayMillis(),
         ioConfig.getAwsAssumedRoleArn(),
-        ioConfig.getAwsExternalId(),
-        ioConfig.isDeaggregate()
+        ioConfig.getAwsExternalId()
     );
   }
 
@@ -197,14 +195,12 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
             ioConfig.getAwsAssumedRoleArn(),
             ioConfig.getAwsExternalId()
         ),
-        0, // no records-per-fetch, it is not used
         ioConfig.getFetchDelayMillis(),
         0, // skip starting background fetch, it is not used
-        ioConfig.isDeaggregate(),
-        
taskTuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(), 
ioConfig.isDeaggregate()),
+        
taskTuningConfig.getRecordBufferSizeBytesOrDefault(Runtime.getRuntime().maxMemory()),
         taskTuningConfig.getRecordBufferOfferTimeout(),
         taskTuningConfig.getRecordBufferFullWait(),
-        
taskTuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()),
+        taskTuningConfig.getMaxBytesPerPollOrDefault(),
         ioConfig.isUseEarliestSequenceNumber(),
         spec.getSpec().getTuningConfig().isUseListShards()
     );
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index a568aea263c..9910f22a349 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -71,12 +71,12 @@ public class KinesisSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
       @JsonProperty("lateMessageRejectionPeriod") Period 
lateMessageRejectionPeriod,
       @JsonProperty("earlyMessageRejectionPeriod") Period 
earlyMessageRejectionPeriod,
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime 
lateMessageRejectionStartDateTime,
-      @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
+      @JsonProperty("recordsPerFetch") @Deprecated Integer recordsPerFetch,
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
       @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig 
autoScalerConfig,
-      @JsonProperty("deaggregate") boolean deaggregate
+      @JsonProperty("deaggregate") @Deprecated boolean deaggregate
   )
   {
     super(
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 357bc9e57ca..a0a68c14bc0 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -74,6 +74,8 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         null,
         null,
         null,
+        null,
+        null,
         null
     );
   }
@@ -98,14 +100,16 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
       @JsonProperty("chatRetries") Long chatRetries,
       @JsonProperty("httpTimeout") Period httpTimeout,
       @JsonProperty("shutdownTimeout") Period shutdownTimeout,
-      @JsonProperty("recordBufferSize") Integer recordBufferSize,
+      @JsonProperty("recordBufferSize") @Deprecated @Nullable Integer 
recordBufferSize,
+      @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes,
       @JsonProperty("recordBufferOfferTimeout") Integer 
recordBufferOfferTimeout,
       @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
       @JsonProperty("fetchThreads") Integer fetchThreads,
       @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
       @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions,
-      @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+      @JsonProperty("maxRecordsPerPoll") @Deprecated @Nullable Integer 
maxRecordsPerPoll,
+      @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll,
       @JsonProperty("intermediateHandoffPeriod") Period 
intermediateHandoffPeriod,
       @JsonProperty("repartitionTransitionDuration") Period 
repartitionTransitionDuration,
       @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@@ -129,6 +133,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         resetOffsetAutomatically,
         skipSequenceNumberAvailabilityCheck,
         recordBufferSize,
+        recordBufferSizeBytes,
         recordBufferOfferTimeout,
         recordBufferFullWait,
         fetchThreads,
@@ -137,6 +142,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         maxParseExceptions,
         maxSavedParseExceptions,
         maxRecordsPerPoll,
+        maxBytesPerPoll,
         intermediateHandoffPeriod
     );
 
@@ -225,7 +231,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
            ", chatRetries=" + chatRetries +
            ", httpTimeout=" + httpTimeout +
            ", shutdownTimeout=" + shutdownTimeout +
-           ", recordBufferSize=" + getRecordBufferSizeConfigured() +
+           ", recordBufferSizeBytes=" + getRecordBufferSizeBytesConfigured() +
            ", recordBufferOfferTimeout=" + getRecordBufferOfferTimeout() +
            ", recordBufferFullWait=" + getRecordBufferFullWait() +
            ", fetchThreads=" + getFetchThreads() +
@@ -234,6 +240,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
            ", maxParseExceptions=" + getMaxParseExceptions() +
            ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
            ", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() +
+           ", maxBytesPerPoll=" + getMaxBytesPerPollConfigured() +
            ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
            ", repartitionTransitionDuration=" + 
getRepartitionTransitionDuration() +
            ", useListShards=" + isUseListShards() +
@@ -260,6 +267,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         isResetOffsetAutomatically(),
         isSkipSequenceNumberAvailabilityCheck(),
         getRecordBufferSizeConfigured(),
+        getRecordBufferSizeBytesConfigured(),
         getRecordBufferOfferTimeout(),
         getRecordBufferFullWait(),
         getFetchThreads(),
@@ -268,6 +276,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         getMaxParseExceptions(),
         getMaxSavedParseExceptions(),
         getMaxRecordsPerPollConfigured(),
+        getMaxBytesPerPollConfigured(),
         getIntermediateHandoffPeriod()
     );
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index 0d6af4343aa..3162b2ea0ee 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -93,13 +93,10 @@ public class KinesisIOConfigTest
     Assert.assertTrue(config.isUseTransaction());
     Assert.assertFalse("minimumMessageTime", 
config.getMinimumMessageTime().isPresent());
     Assert.assertEquals(config.getEndpoint(), 
"kinesis.us-east-1.amazonaws.com");
-    Assert.assertNull(config.getRecordsPerFetchConfigured());
-    Assert.assertEquals(config.getRecordsPerFetchOrDefault(1_000_000_000, 4), 
1250);
     Assert.assertEquals(config.getFetchDelayMillis(), 0);
     Assert.assertEquals(Collections.emptySet(), 
config.getStartSequenceNumbers().getExclusivePartitions());
     Assert.assertNull(config.getAwsAssumedRoleArn());
     Assert.assertNull(config.getAwsExternalId());
-    Assert.assertFalse(config.isDeaggregate());
   }
 
   @Test
@@ -115,11 +112,9 @@ public class KinesisIOConfigTest
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
                      + "  \"endpoint\": \"kinesis.us-east-2.amazonaws.com\",\n"
-                     + "  \"recordsPerFetch\": 1000,\n"
                      + "  \"fetchDelayMillis\": 1000,\n"
                      + "  \"awsAssumedRoleArn\": \"role\",\n"
-                     + "  \"awsExternalId\": \"awsexternalid\",\n"
-                     + "  \"deaggregate\": true\n"
+                     + "  \"awsExternalId\": \"awsexternalid\"\n"
                      + "}";
 
     KinesisIndexTaskIOConfig config = (KinesisIndexTaskIOConfig) 
mapper.readValue(
@@ -150,11 +145,9 @@ public class KinesisIOConfigTest
     Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), 
config.getMaximumMessageTime().get());
     Assert.assertEquals(config.getEndpoint(), 
"kinesis.us-east-2.amazonaws.com");
     
Assert.assertEquals(config.getStartSequenceNumbers().getExclusivePartitions(), 
ImmutableSet.of("0"));
-    Assert.assertEquals(1000, (int) config.getRecordsPerFetchConfigured());
     Assert.assertEquals(1000, config.getFetchDelayMillis());
     Assert.assertEquals("role", config.getAwsAssumedRoleArn());
     Assert.assertEquals("awsexternalid", config.getAwsExternalId());
-    Assert.assertTrue(config.isDeaggregate());
   }
 
   @Test
@@ -272,11 +265,9 @@ public class KinesisIOConfigTest
         DateTimes.nowUtc(),
         null,
         "endpoint",
-        1000,
         2000,
         "awsAssumedRoleArn",
-        "awsExternalId",
-        true
+        "awsExternalId"
     );
 
     final byte[] json = mapper.writeValueAsBytes(currentConfig);
@@ -302,11 +293,9 @@ public class KinesisIOConfigTest
     Assert.assertEquals(currentConfig.getMinimumMessageTime(), 
oldConfig.getMinimumMessageTime());
     Assert.assertEquals(currentConfig.getMaximumMessageTime(), 
oldConfig.getMaximumMessageTime());
     Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint());
-    Assert.assertEquals((int) currentConfig.getRecordsPerFetchConfigured(), 
oldConfig.getRecordsPerFetch());
     Assert.assertEquals(currentConfig.getFetchDelayMillis(), 
oldConfig.getFetchDelayMillis());
     Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), 
oldConfig.getAwsAssumedRoleArn());
     Assert.assertEquals(currentConfig.getAwsExternalId(), 
oldConfig.getAwsExternalId());
-    Assert.assertEquals(currentConfig.isDeaggregate(), 
oldConfig.isDeaggregate());
   }
 
   @Test
@@ -324,11 +313,9 @@ public class KinesisIOConfigTest
         DateTimes.nowUtc(),
         DateTimes.nowUtc(),
         "endpoint",
-        1000,
         2000,
         "awsAssumedRoleArn",
-        "awsExternalId",
-        true
+        "awsExternalId"
     );
 
     final byte[] json = oldMapper.writeValueAsBytes(oldConfig);
@@ -349,11 +336,9 @@ public class KinesisIOConfigTest
     Assert.assertEquals(oldConfig.getMinimumMessageTime(), 
currentConfig.getMinimumMessageTime());
     Assert.assertEquals(oldConfig.getMaximumMessageTime(), 
currentConfig.getMaximumMessageTime());
     Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint());
-    Assert.assertEquals(oldConfig.getRecordsPerFetch(), (int) 
currentConfig.getRecordsPerFetchConfigured());
     Assert.assertEquals(oldConfig.getFetchDelayMillis(), 
currentConfig.getFetchDelayMillis());
     Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), 
currentConfig.getAwsAssumedRoleArn());
     Assert.assertEquals(oldConfig.getAwsExternalId(), 
currentConfig.getAwsExternalId());
-    Assert.assertEquals(oldConfig.isDeaggregate(), 
currentConfig.isDeaggregate());
   }
 
   private static class OldKinesisIndexTaskIoConfig implements IOConfig
@@ -366,12 +351,9 @@ public class KinesisIOConfigTest
     private final Optional<DateTime> minimumMessageTime;
     private final Optional<DateTime> maximumMessageTime;
     private final String endpoint;
-    private final Integer recordsPerFetch;
     private final Integer fetchDelayMillis;
-
     private final String awsAssumedRoleArn;
     private final String awsExternalId;
-    private final boolean deaggregate;
 
     @JsonCreator
     private OldKinesisIndexTaskIoConfig(
@@ -383,11 +365,9 @@ public class KinesisIOConfigTest
         @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
         @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
         @JsonProperty("endpoint") String endpoint,
-        @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
         @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
         @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
-        @JsonProperty("awsExternalId") String awsExternalId,
-        @JsonProperty("deaggregate") boolean deaggregate
+        @JsonProperty("awsExternalId") String awsExternalId
     )
     {
       this.baseSequenceName = baseSequenceName;
@@ -398,11 +378,9 @@ public class KinesisIOConfigTest
       this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
       this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
       this.endpoint = endpoint;
-      this.recordsPerFetch = recordsPerFetch;
       this.fetchDelayMillis = fetchDelayMillis;
       this.awsAssumedRoleArn = awsAssumedRoleArn;
       this.awsExternalId = awsExternalId;
-      this.deaggregate = deaggregate;
     }
 
     @JsonProperty
@@ -453,12 +431,6 @@ public class KinesisIOConfigTest
       return endpoint;
     }
 
-    @JsonProperty
-    public int getRecordsPerFetch()
-    {
-      return recordsPerFetch;
-    }
-
     @JsonProperty
     public int getFetchDelayMillis()
     {
@@ -476,11 +448,5 @@ public class KinesisIOConfigTest
     {
       return awsExternalId;
     }
-
-    @JsonProperty
-    public boolean isDeaggregate()
-    {
-      return deaggregate;
-    }
   }
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 2cba3f54187..da04e3ab0a6 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -76,6 +76,8 @@ public class KinesisIndexTaskSerdeTest
       null,
       null,
       null,
+      null,
+      null,
       null
   );
   private static final KinesisIndexTaskIOConfig IO_CONFIG = new 
KinesisIndexTaskIOConfig(
@@ -90,9 +92,7 @@ public class KinesisIndexTaskSerdeTest
       "endpoint",
       null,
       null,
-      null,
-      null,
-      false
+      null
   );
   private static final String ACCESS_KEY = "test-access-key";
   private static final String SECRET_KEY = "test-secret-key";
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 69516979f3e..e14b6679b09 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -184,6 +184,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   private Long maxTotalRows = null;
   private final Period intermediateHandoffPeriod = null;
   private int maxRecordsPerPoll;
+  private int maxBytesPerPoll;
 
   @BeforeClass
   public static void setupClass()
@@ -218,6 +219,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     doHandoff = true;
     reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + 
System.currentTimeMillis(), "json");
     maxRecordsPerPoll = 1;
+    maxBytesPerPoll = 1_000_000;
 
     recordSupplier = mock(KinesisRecordSupplier.class);
 
@@ -562,6 +564,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     // as soon as any segment has more than one record, incremental publishing 
should happen
     maxRowsPerSegment = 2;
     maxRecordsPerPoll = 1;
+    maxBytesPerPoll = 1_000_000;
 
     recordSupplier.assign(EasyMock.anyObject());
     EasyMock.expectLastCall().anyTimes();
@@ -779,9 +782,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             "awsEndpoint",
             null,
             null,
-            null,
-            null,
-            false
+            null
         )
     );
 
@@ -843,9 +844,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             "awsEndpoint",
             null,
             null,
-            null,
-            null,
-            false
+            null
         )
     );
 
@@ -1697,6 +1696,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   {
     maxRowsPerSegment = 2;
     maxRecordsPerPoll = 1;
+    maxBytesPerPoll = 1_000_000;
     List<OrderedPartitionableRecord<String, String, ByteEntity>> records =
         clone(SINGLE_PARTITION_RECORDS);
 
@@ -1935,9 +1935,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             "awsEndpoint",
             null,
             null,
-            null,
-            null,
-            false
+            null
         ),
         context
     );
@@ -2099,9 +2097,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             "awsEndpoint",
             null,
             null,
-            null,
-            null,
-            false
+            null
         ),
         context
     );
@@ -2250,10 +2246,15 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
   public void testComputeFetchThreads()
   {
     final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo =
-        new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 2000);
+        new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 
10_000_000_000L);
 
     Assert.assertEquals(6, KinesisIndexTask.computeFetchThreads(runtimeInfo, 
null));
     Assert.assertEquals(2, KinesisIndexTask.computeFetchThreads(runtimeInfo, 
2));
+
+    final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo2 =
+        new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 1_000_000_000);
+    Assert.assertEquals(5, KinesisIndexTask.computeFetchThreads(runtimeInfo2, 
null));
+    Assert.assertEquals(5, KinesisIndexTask.computeFetchThreads(runtimeInfo2, 
6));
     Assert.assertThrows(
         IllegalArgumentException.class,
         () -> KinesisIndexTask.computeFetchThreads(runtimeInfo, 0)
@@ -2297,9 +2298,7 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             "awsEndpoint",
             null,
             null,
-            null,
-            null,
-            false
+            null
         ),
         null
     );
@@ -2358,10 +2357,12 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         null,
         null,
         null,
+        null,
         logParseExceptions,
         maxParseExceptions,
         maxSavedParseExceptions,
         maxRecordsPerPoll,
+        maxBytesPerPoll,
         intermediateHandoffPeriod
     );
     return createTask(taskId, dataSchema, ioConfig, tuningConfig, context);
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index cd99521c18e..b61c5cf2ae4 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -77,8 +77,10 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertFalse(config.isReportParseExceptions());
     Assert.assertEquals(Duration.ofMinutes(15).toMillis(), 
config.getHandoffConditionTimeout());
-    Assert.assertNull(config.getRecordBufferSizeConfigured());
-    Assert.assertEquals(10000, 
config.getRecordBufferSizeOrDefault(1_000_000_000, false));
+    Assert.assertNull(config.getRecordBufferSizeBytesConfigured());
+    Assert.assertEquals(100_000_000, 
config.getRecordBufferSizeBytesOrDefault(2_000_000_000));
+    Assert.assertEquals(100_000_000, 
config.getRecordBufferSizeBytesOrDefault(1_000_000_000));
+    Assert.assertEquals(10_000_000, 
config.getRecordBufferSizeBytesOrDefault(100_000_000));
     Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
     Assert.assertEquals(5000, config.getRecordBufferFullWait());
     Assert.assertNull(config.getFetchThreads());
@@ -98,7 +100,7 @@ public class KinesisIndexTaskTuningConfigTest
                      + "  \"maxPendingPersists\": 100,\n"
                      + "  \"reportParseExceptions\": true,\n"
                      + "  \"handoffConditionTimeout\": 100,\n"
-                     + "  \"recordBufferSize\": 1000,\n"
+                     + "  \"recordBufferSizeBytes\": 1000,\n"
                      + "  \"recordBufferOfferTimeout\": 500,\n"
                      + "  \"recordBufferFullWait\": 500,\n"
                      + "  \"resetOffsetAutomatically\": false,\n"
@@ -125,8 +127,8 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(100, config.getMaxPendingPersists());
     Assert.assertTrue(config.isReportParseExceptions());
     Assert.assertEquals(100, config.getHandoffConditionTimeout());
-    Assert.assertEquals(1000, (int) config.getRecordBufferSizeConfigured());
-    Assert.assertEquals(1000, 
config.getRecordBufferSizeOrDefault(1_000_000_000, false));
+    Assert.assertEquals(1000, (int) 
config.getRecordBufferSizeBytesConfigured());
+    Assert.assertEquals(1000, 
config.getRecordBufferSizeBytesOrDefault(1_000_000_000));
     Assert.assertEquals(500, config.getRecordBufferOfferTimeout());
     Assert.assertEquals(500, config.getRecordBufferFullWait());
     Assert.assertEquals(2, (int) config.getFetchThreads());
@@ -153,6 +155,7 @@ public class KinesisIndexTaskTuningConfigTest
         5L,
         true,
         false,
+        null,
         1000,
         1000,
         500,
@@ -162,6 +165,7 @@ public class KinesisIndexTaskTuningConfigTest
         500,
         500,
         6000,
+        1_000_000,
         new Period("P3D")
     );
 
@@ -190,7 +194,9 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(base.getRecordBufferFullWait(), 
deserialized.getRecordBufferFullWait());
     Assert.assertEquals(base.getRecordBufferOfferTimeout(), 
deserialized.getRecordBufferOfferTimeout());
     Assert.assertEquals(base.getRecordBufferSizeConfigured(), 
deserialized.getRecordBufferSizeConfigured());
+    Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(), 
deserialized.getRecordBufferSizeBytesConfigured());
     Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), 
deserialized.getMaxRecordsPerPollConfigured());
+    Assert.assertEquals(base.getMaxBytesPerPollConfigured(), 
deserialized.getMaxBytesPerPollConfigured());
   }
 
   @Test
@@ -212,6 +218,7 @@ public class KinesisIndexTaskTuningConfigTest
         5L,
         true,
         false,
+        null,
         1000,
         1000,
         500,
@@ -220,6 +227,7 @@ public class KinesisIndexTaskTuningConfigTest
         false,
         500,
         500,
+        1_000_000,
         6000,
         new Period("P3D")
     );
@@ -247,7 +255,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxSavedParseExceptions(), 
deserialized.getMaxSavedParseExceptions());
     Assert.assertEquals(base.getRecordBufferFullWait(), 
deserialized.getRecordBufferFullWait());
     Assert.assertEquals(base.getRecordBufferOfferTimeout(), 
deserialized.getRecordBufferOfferTimeout());
-    Assert.assertEquals(base.getRecordBufferSizeConfigured(), 
deserialized.getRecordBufferSizeConfigured());
+    Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(), 
deserialized.getRecordBufferSizeBytesConfigured());
     Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), 
deserialized.getMaxRecordsPerPollConfigured());
   }
 
@@ -301,6 +309,7 @@ public class KinesisIndexTaskTuningConfigTest
         null,
         null,
         null,
+        null,
         1000,
         500,
         500,
@@ -309,6 +318,7 @@ public class KinesisIndexTaskTuningConfigTest
         null,
         null,
         10,
+        1_000_000,
         null,
         null,
         null,
@@ -327,7 +337,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(IndexSpec.DEFAULT, copy.getIndexSpec());
     Assert.assertTrue(copy.isReportParseExceptions());
     Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
-    Assert.assertEquals(1000, (int) copy.getRecordBufferSizeConfigured());
+    Assert.assertEquals(1000, (int) copy.getRecordBufferSizeBytesConfigured());
     Assert.assertEquals(500, copy.getRecordBufferOfferTimeout());
     Assert.assertEquals(500, copy.getRecordBufferFullWait());
     Assert.assertEquals(2, (int) copy.getFetchThreads());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index af0755fcd4b..5fcf81139eb 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -147,8 +147,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
       throw new RuntimeException(e);
     }
   }
-
-  private static int recordsPerFetch;
   private static AmazonKinesis kinesis;
   private static ListShardsResult listShardsResult0;
   private static ListShardsResult listShardsResult1;
@@ -180,7 +178,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     getRecordsResult1 = createMock(GetRecordsResult.class);
     shard0 = createMock(Shard.class);
     shard1 = createMock(Shard.class);
-    recordsPerFetch = 1;
   }
 
   @After
@@ -219,14 +216,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        false,
         100,
         5000,
         5000,
-        5,
+        1_000_000,
         true,
         false
     );
@@ -278,14 +273,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        false,
         100,
         5000,
         5000,
-        5,
+        1_000_000,
         true,
         true
     );
@@ -312,7 +305,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     Assert.assertEquals(expectedRequest1, capturedRequest1.getValue());
   }
 
-  private static GetRecordsRequest generateGetRecordsReq(String shardIterator, 
int limit)
+  private static GetRecordsRequest generateGetRecordsReq(String shardIterator)
+  {
+    return new GetRecordsRequest().withShardIterator(shardIterator);
+  }
+
+  private static GetRecordsRequest generateGetRecordsWithLimitReq(String 
shardIterator, int limit)
   {
     return new 
GetRecordsRequest().withShardIterator(shardIterator).withLimit(limit);
   }
@@ -326,87 +324,9 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
                   .collect(Collectors.toList());
   }
 
-  @Test
-  public void testPoll() throws InterruptedException
-  {
-    recordsPerFetch = 100;
-
-    EasyMock.expect(kinesis.getShardIterator(
-        EasyMock.anyObject(),
-        EasyMock.eq(SHARD_ID0),
-        EasyMock.anyString(),
-        EasyMock.anyString()
-    )).andReturn(
-        getShardIteratorResult0).anyTimes();
-
-    EasyMock.expect(kinesis.getShardIterator(
-        EasyMock.anyObject(),
-        EasyMock.eq(SHARD_ID1),
-        EasyMock.anyString(),
-        EasyMock.anyString()
-    )).andReturn(
-        getShardIteratorResult1).anyTimes();
-
-    
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
-    
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
-            .andReturn(getRecordsResult0)
-            .anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, 
recordsPerFetch)))
-            .andReturn(getRecordsResult1)
-            .anyTimes();
-    
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
-    
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
-    
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
-    
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
-    
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
-    
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
-
-    replayAll();
-
-    Set<StreamPartition<String>> partitions = ImmutableSet.of(
-        StreamPartition.of(STREAM, SHARD_ID0),
-        StreamPartition.of(STREAM, SHARD_ID1)
-    );
-
-
-    recordSupplier = new KinesisRecordSupplier(
-        kinesis,
-        recordsPerFetch,
-        0,
-        2,
-        false,
-        100,
-        5000,
-        5000,
-        100,
-        true,
-        false
-    );
-
-    recordSupplier.assign(partitions);
-    recordSupplier.seekToEarliest(partitions);
-    recordSupplier.start();
-
-    while (recordSupplier.bufferSize() < 12) {
-      Thread.sleep(100);
-    }
-
-    List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords 
= cleanRecords(recordSupplier.poll(
-        POLL_TIMEOUT_MILLIS));
-
-    verifyAll();
-
-    Assert.assertEquals(partitions, recordSupplier.getAssignment());
-    Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
-    Assert.assertEquals(SHARDS_LAG_MILLIS, 
recordSupplier.getPartitionResourcesTimeLag());
-  }
-
   @Test
   public void testPollWithKinesisInternalFailure() throws InterruptedException
   {
-    recordsPerFetch = 100;
-
     EasyMock.expect(kinesis.getShardIterator(
             EasyMock.anyObject(),
             EasyMock.eq(SHARD_ID0),
@@ -425,10 +345,10 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
     
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
             .andReturn(getRecordsResult0)
             .anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
             .andReturn(getRecordsResult1)
             .anyTimes();
     AmazonServiceException getException = new 
AmazonServiceException("InternalFailure");
@@ -460,14 +380,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
             kinesis,
-            recordsPerFetch,
             0,
             2,
-            false,
-            100,
+            10_000,
             5000,
             5000,
-            100,
+        1_000_000,
             true,
             false
     );
@@ -493,8 +411,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   @Test
   public void testPollWithKinesisNonRetryableFailure() throws 
InterruptedException
   {
-    recordsPerFetch = 100;
-
     EasyMock.expect(kinesis.getShardIterator(
         EasyMock.anyObject(),
         EasyMock.eq(SHARD_ID0),
@@ -508,7 +424,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     getException.setStatusCode(400);
     getException.setServiceName("AmazonKinesis");
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
             .andThrow(getException)
             .once();
 
@@ -521,14 +437,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         1,
-        false,
         100,
         5000,
         5000,
-        100,
+        1_000_000,
         true,
         false
     );
@@ -556,8 +470,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   public void testSeek()
       throws InterruptedException
   {
-    recordsPerFetch = 100;
-
     EasyMock.expect(kinesis.getShardIterator(
         EasyMock.anyObject(),
         EasyMock.eq(SHARD_ID0),
@@ -576,10 +488,10 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
     
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
             .andReturn(getRecordsResult0)
             .anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
             .andReturn(getRecordsResult1)
             .anyTimes();
     
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS.subList(1,
 SHARD0_RECORDS.size())).once();
@@ -600,14 +512,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        false,
-        100,
+        10_000,
         5000,
         5000,
-        100,
+        1_000_000,
         true,
         false
     );
@@ -636,8 +546,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   public void testSeekToLatest()
       throws InterruptedException
   {
-    recordsPerFetch = 100;
-
     EasyMock.expect(kinesis.getShardIterator(
         EasyMock.anyObject(),
         EasyMock.eq(SHARD_ID0),
@@ -668,14 +576,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        false,
         100,
         5000,
         5000,
-        100,
+        1_000_000,
         true,
         false
     );
@@ -703,14 +609,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        1,
         0,
         2,
-        false,
         100,
         5000,
         5000,
-        5,
+        1_000_000,
         true,
         false
     );
@@ -725,7 +629,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
       throws InterruptedException
   {
     // tests that after doing a seek, the now invalid records in buffer is 
cleaned up properly
-    recordsPerFetch = 100;
 
     EasyMock.expect(kinesis.getShardIterator(
         EasyMock.anyObject(),
@@ -745,10 +648,10 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).once();
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
             .andReturn(getRecordsResult1)
             .once();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
             .andReturn(getRecordsResult0)
             .once();
     
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS.subList(5,
 SHARD1_RECORDS.size())).once();
@@ -766,14 +669,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        false,
-        100,
+        10_000,
         5000,
         5000,
-        1,
+        1_000_000,
         true,
         false
     );
@@ -816,8 +717,6 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   @Test
   public void testPollDeaggregate() throws InterruptedException
   {
-    recordsPerFetch = 100;
-
     EasyMock.expect(kinesis.getShardIterator(
         EasyMock.anyObject(),
         EasyMock.eq(SHARD_ID0),
@@ -836,10 +735,10 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
     
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
             .andReturn(getRecordsResult0)
             .anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
             .andReturn(getRecordsResult1)
             .anyTimes();
     
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
@@ -859,14 +758,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        true,
-        100,
+        10_000,
         5000,
         5000,
-        100,
+        1_000_000,
         true,
         false
     );
@@ -922,7 +819,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once();
 
     AmazonClientException ex = new AmazonClientException(new IOException());
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
1000)))
+    
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR,
 1000)))
             .andThrow(ex)
             .andReturn(getRecordsResult0)
             .once();
@@ -935,14 +832,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        true,
-        100,
+        10_000,
         5000,
         5000,
-        100,
+        1_000_000,
         true,
         false
     );
@@ -961,7 +856,7 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).times(1);
 
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
1000)))
+    
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR,
 1000)))
             .andReturn(getRecordsResult0)
             .times(1);
 
@@ -972,14 +867,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        true,
-        100,
+        10_000,
         5000,
         5000,
-        100,
+        1_000_000,
         true,
         false
     );
@@ -1033,12 +926,18 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
     
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
             .andReturn(getRecordsResult0)
             .anyTimes();
-    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, 
recordsPerFetch)))
+    EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
             .andReturn(getRecordsResult1)
             .anyTimes();
+    
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR,
 1)))
+        .andReturn(getRecordsResult0)
+        .anyTimes();
+    
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD1_ITERATOR,
 1)))
+        .andReturn(getRecordsResult1)
+        .anyTimes();
     
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).times(2);
     
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS_EMPTY).times(2);
     
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
@@ -1055,14 +954,12 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
 
     recordSupplier = new KinesisRecordSupplier(
         kinesis,
-        recordsPerFetch,
         0,
         2,
-        true,
-        100,
+        10_000,
         5000,
         5000,
-        100,
+        1_000_000,
         true,
         false
     );
@@ -1110,17 +1007,16 @@ public class KinesisRecordSupplierTest extends 
EasyMockSupport
   public void testIsOffsetAvailable()
   {
     AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
-    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
-                                                             recordsPerFetch,
-                                                             0,
-                                                             2,
-                                                             false,
-                                                             100,
-                                                             5000,
-                                                             5000,
-                                                             5,
-                                                             true,
-                                                             false
+    KinesisRecordSupplier target = new KinesisRecordSupplier(
+        mockKinesis,
+        0,
+        2,
+        100,
+        5000,
+        5000,
+        1_000_000,
+        true,
+        false
     );
     StreamPartition<String> partition = new StreamPartition<>(STREAM, 
SHARD_ID0);
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
index fabde1852aa..4de35cf5e5d 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
@@ -70,11 +70,9 @@ public class KinesisSupervisorIOConfigTest
     Assert.assertFalse("lateMessageRejectionPeriod", 
config.getLateMessageRejectionPeriod().isPresent());
     Assert.assertFalse("earlyMessageRejectionPeriod", 
config.getEarlyMessageRejectionPeriod().isPresent());
     Assert.assertFalse("lateMessageRejectionStartDateTime", 
config.getLateMessageRejectionStartDateTime().isPresent());
-    Assert.assertNull(config.getRecordsPerFetch());
     Assert.assertEquals(0, config.getFetchDelayMillis());
     Assert.assertNull(config.getAwsAssumedRoleArn());
     Assert.assertNull(config.getAwsExternalId());
-    Assert.assertFalse(config.isDeaggregate());
   }
 
   @Test
@@ -94,11 +92,9 @@ public class KinesisSupervisorIOConfigTest
                      + "  \"completionTimeout\": \"PT45M\",\n"
                      + "  \"lateMessageRejectionPeriod\": \"PT1H\",\n"
                      + "  \"earlyMessageRejectionPeriod\": \"PT1H\",\n"
-                     + "  \"recordsPerFetch\": 4000,\n"
                      + "  \"fetchDelayMillis\": 1000,\n"
                      + "  \"awsAssumedRoleArn\": \"role\",\n"
-                     + "  \"awsExternalId\": \"awsexternalid\",\n"
-                     + "  \"deaggregate\": true\n"
+                     + "  \"awsExternalId\": \"awsexternalid\"\n"
                      + "}";
 
     KinesisSupervisorIOConfig config = mapper.readValue(
@@ -117,11 +113,9 @@ public class KinesisSupervisorIOConfigTest
     Assert.assertEquals(Duration.standardMinutes(45), 
config.getCompletionTimeout());
     Assert.assertEquals(Duration.standardHours(1), 
config.getLateMessageRejectionPeriod().get());
     Assert.assertEquals(Duration.standardHours(1), 
config.getEarlyMessageRejectionPeriod().get());
-    Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch());
     Assert.assertEquals(1000, config.getFetchDelayMillis());
     Assert.assertEquals("role", config.getAwsAssumedRoleArn());
     Assert.assertEquals("awsexternalid", config.getAwsExternalId());
-    Assert.assertTrue(config.isDeaggregate());
   }
 
   @Test
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index c9b6406bd3f..87297dd8f28 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -208,6 +208,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
+        null,
         null
     );
     rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@@ -318,7 +320,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
             null,
             false,
             null,
-            null,
             autoScalerConfig
             );
     KinesisSupervisorSpec kinesisSupervisorSpec = 
supervisor.getKinesisSupervisorSpec();
@@ -392,7 +393,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
             null,
             false,
             null,
-            null,
             autoScalerConfig
     );
 
@@ -509,26 +509,26 @@ public class KinesisSupervisorTest extends EasyMockSupport
   {
     // create KinesisSupervisorIOConfig with autoScalerConfig null
     KinesisSupervisorIOConfig 
kinesisSupervisorIOConfigWithNullAutoScalerConfig = new 
KinesisSupervisorIOConfig(
-            STREAM,
-            INPUT_FORMAT,
-            "awsEndpoint",
-            null,
-            1,
-            1,
-            new Period("PT30M"),
-            new Period("P1D"),
-            new Period("PT30S"),
-            false,
-            new Period("PT30M"),
-            null,
-            null,
-            null,
-            100,
-            1000,
-            null,
-            null,
-            null,
-            false
+        STREAM,
+        INPUT_FORMAT,
+        "awsEndpoint",
+        null,
+        1,
+        1,
+        new Period("PT30M"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        100,
+        1000,
+        null,
+        null,
+        null,
+        false
     );
 
     AutoScalerConfig autoscalerConfigNull = 
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig();
@@ -536,26 +536,26 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     // create KinesisSupervisorIOConfig with autoScalerConfig Empty
     KinesisSupervisorIOConfig 
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new 
KinesisSupervisorIOConfig(
-            STREAM,
-            INPUT_FORMAT,
-            "awsEndpoint",
-            null,
-            1,
-            1,
-            new Period("PT30M"),
-            new Period("P1D"),
-            new Period("PT30S"),
-            false,
-            new Period("PT30M"),
-            null,
-            null,
-            null,
-            100,
-            1000,
-            null,
-            null,
-             OBJECT_MAPPER.convertValue(new HashMap<>(), 
AutoScalerConfig.class),
-            false
+        STREAM,
+        INPUT_FORMAT,
+        "awsEndpoint",
+        null,
+        1,
+        1,
+        new Period("PT30M"),
+        new Period("P1D"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        100,
+        1000,
+        null,
+        null,
+        OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
+        false
     );
 
     AutoScalerConfig autoscalerConfig = 
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig();
@@ -3740,7 +3740,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new Period("P1D"),
         new Period("P1D"),
         false,
-        42,
         1000,
         true
     );
@@ -3838,7 +3837,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new Period("P1D"),
         new Period("P1D"),
         false,
-        42,
         1000,
         false
     );
@@ -3924,7 +3922,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new Period("P1D"),
         false,
         42,
-        42,
         dataSchema,
         tuningConfig
     );
@@ -3967,7 +3964,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
         42, // This property is different from tuningConfig
+        1_000_000,
         null,
         null,
         null,
@@ -4074,7 +4073,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new Period("P1D"),
         false,
         42,
-        42,
         dataSchema,
         tuningConfig
     );
@@ -5149,6 +5147,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
+        null,
         null
     );
 
@@ -5198,7 +5198,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         earlyMessageRejectionPeriod,
         false,
         null,
-        null,
         null
     );
   }
@@ -5211,7 +5210,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
       boolean suspended,
-      Integer recordsPerFetch,
       Integer fetchDelayMillis,
       AutoScalerConfig autoScalerConfig
   )
@@ -5231,7 +5229,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         null,
-        recordsPerFetch,
+        null,
         fetchDelayMillis,
         null,
         null,
@@ -5301,7 +5299,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
       boolean suspended,
-      Integer recordsPerFetch,
       Integer fetchDelayMillis,
       boolean isTaskCurrentReturn
   )
@@ -5321,7 +5318,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         null,
-        recordsPerFetch,
+        null,
         fetchDelayMillis,
         null,
         null,
@@ -5389,7 +5386,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
       boolean suspended,
-      Integer recordsPerFetch,
       Integer fetchDelayMillis,
       DataSchema dataSchema,
       KinesisSupervisorTuningConfig tuningConfig
@@ -5410,7 +5406,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
         null,
-        recordsPerFetch,
+        null,
         fetchDelayMillis,
         null,
         null,
@@ -5559,9 +5555,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
             "awsEndpoint",
             null,
             null,
-            null,
-            null,
-            false
+            null
         ),
         Collections.emptyMap(),
         false,
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index 70611266efe..ac84d2105cd 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -52,6 +52,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends 
KinesisIndexTaskTu
       @JsonProperty("resetOffsetAutomatically") Boolean 
resetOffsetAutomatically,
       @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean 
skipSequenceNumberAvailabilityCheck,
       @JsonProperty("recordBufferSize") Integer recordBufferSize,
+      @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes,
       @JsonProperty("recordBufferOfferTimeout") Integer 
recordBufferOfferTimeout,
       @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
       @JsonProperty("fetchThreads") Integer fetchThreads,
@@ -60,6 +61,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends 
KinesisIndexTaskTu
       @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions,
       @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+      @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll,
       @JsonProperty("intermediateHandoffPeriod") @Nullable Period 
intermediateHandoffPeriod,
       @JsonProperty("extra") String extra
   )
@@ -81,6 +83,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends 
KinesisIndexTaskTu
         resetOffsetAutomatically,
         skipSequenceNumberAvailabilityCheck,
         recordBufferSize,
+        recordBufferSizeBytes,
         recordBufferOfferTimeout,
         recordBufferFullWait,
         fetchThreads,
@@ -89,6 +92,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends 
KinesisIndexTaskTu
         maxParseExceptions,
         maxSavedParseExceptions,
         maxRecordsPerPoll,
+        maxBytesPerPoll,
         intermediateHandoffPeriod
     );
     this.extra = extra;
@@ -113,6 +117,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig 
extends KinesisIndexTaskTu
         base.isResetOffsetAutomatically(),
         base.isSkipSequenceNumberAvailabilityCheck(),
         base.getRecordBufferSizeConfigured(),
+        base.getRecordBufferSizeBytesConfigured(),
         base.getRecordBufferOfferTimeout(),
         base.getRecordBufferFullWait(),
         base.getFetchThreads(),
@@ -121,6 +126,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig 
extends KinesisIndexTaskTu
         base.getMaxParseExceptions(),
         base.getMaxSavedParseExceptions(),
         base.getMaxRecordsPerPollConfigured(),
+        base.getMaxBytesPerPollConfigured(),
         base.getIntermediateHandoffPeriod()
     );
     this.extra = extra;
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java
new file mode 100644
index 00000000000..49105f3a833
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
+
+/**
+ * Similar to LinkedBlockingQueue but can be bounded by the total byte size of 
the items present in the queue
+ * rather than number of items.
+ */
+public class MemoryBoundLinkedBlockingQueue<T>
+{
+  private final long memoryBound;
+  private final AtomicLong currentMemory;
+  private final LinkedBlockingQueue<ObjectContainer<T>> queue;
+  private final ReentrantLock putLock = new ReentrantLock();
+  private final Condition notFull = putLock.newCondition();
+
+  public MemoryBoundLinkedBlockingQueue(long memoryBound)
+  {
+    this(new LinkedBlockingQueue<>(), memoryBound);
+  }
+
+  @VisibleForTesting
+  MemoryBoundLinkedBlockingQueue(LinkedBlockingQueue<ObjectContainer<T>> 
queue, long memoryBound)
+  {
+    this.memoryBound = memoryBound;
+    this.currentMemory = new AtomicLong(0L);
+    this.queue = queue;
+  }
+
+  // returns true/false depending on whether item was added or not
+  public boolean offer(ObjectContainer<T> item)
+  {
+    final long itemLength = item.getSize();
+
+    if (currentMemory.addAndGet(itemLength) <= memoryBound) {
+      if (queue.offer(item)) {
+        return true;
+      }
+    }
+    currentMemory.addAndGet(-itemLength);
+    return false;
+  }
+
+  public boolean offer(ObjectContainer<T> item, long timeout, TimeUnit unit) 
throws InterruptedException
+  {
+    final long itemLength = item.getSize();
+
+    long nanos = unit.toNanos(timeout);
+    final ReentrantLock putLock = this.putLock;
+    putLock.lockInterruptibly();
+    try {
+      while (currentMemory.get() + itemLength > memoryBound) {
+        if (nanos <= 0L) {
+          return false;
+        }
+        nanos = notFull.awaitNanos(nanos);
+      }
+      if (currentMemory.addAndGet(itemLength) <= memoryBound) {
+        if (queue.offer(item, timeout, unit)) {
+          return true;
+        }
+      }
+    }
+    catch (InterruptedException e) {
+      currentMemory.addAndGet(-itemLength);
+      throw e;
+    }
+    finally {
+      putLock.unlock();
+    }
+    currentMemory.addAndGet(-itemLength);
+    return false;
+  }
+
+  // blocks until at least one item is available to take
+  public ObjectContainer<T> take() throws InterruptedException
+  {
+    final ObjectContainer<T> ret = queue.take();
+    currentMemory.addAndGet(-ret.getSize());
+    signalNotFull();
+    return ret;
+  }
+
+  public Stream<ObjectContainer<T>> stream()
+  {
+    return queue.stream();
+  }
+
+  /**
+   * Drain up to specified bytes worth of items from the queue into the 
provided buffer. At least one record is
+   * drained from the queue, regardless of the value of bytes specified.
+   *
+   * @param buffer       The buffer to drain queue items into.
+   * @param bytesToDrain The amount of bytes to drain from the queue
+   * @param timeout      The maximum time allowed to drain the queue
+   * @param unit         The time unit of the timeout.
+   *
+   * @return The number of items drained from the queue.
+   * @throws InterruptedException
+   */
+  public int drain(Collection<? super ObjectContainer<T>> buffer, int 
bytesToDrain, long timeout, TimeUnit unit)
+      throws InterruptedException
+  {
+    Preconditions.checkNotNull(buffer);
+    boolean signalNotFull = false;
+    try {
+      long deadline = System.nanoTime() + unit.toNanos(timeout);
+      int added = 0;
+      long bytesAdded = 0;
+      while (bytesAdded < bytesToDrain) {
+        ObjectContainer<T> e = queue.poll(deadline - System.nanoTime(), 
TimeUnit.NANOSECONDS);
+        if (e == null) {
+          break;
+        }
+        currentMemory.addAndGet(-e.getSize());
+        signalNotFull = true;
+        buffer.add(e);
+        ++added;
+        bytesAdded += e.getSize();
+        e = queue.peek();
+        if (e != null && (bytesAdded + e.getSize()) > bytesToDrain) {
+          break;
+        }
+      }
+      return added;
+    }
+    finally {
+      if (signalNotFull) {
+        signalNotFull();
+      }
+    }
+  }
+
+  public int size()
+  {
+    return queue.size();
+  }
+
+  public long byteSize()
+  {
+    return currentMemory.get();
+  }
+
+  public long remainingCapacity()
+  {
+    return memoryBound - currentMemory.get();
+  }
+
+  private void signalNotFull()
+  {
+    final ReentrantLock putLock = this.putLock;
+    putLock.lock();
+    try {
+      notFull.signal();
+    }
+    finally {
+      putLock.unlock();
+    }
+  }
+
+  public static class ObjectContainer<T>
+  {
+    private final T data;
+    private final long size;
+
+    public ObjectContainer(T data, long size)
+    {
+      this.data = data;
+      this.size = size;
+    }
+
+    public T getData()
+    {
+      return data;
+    }
+
+    public long getSize()
+    {
+      return size;
+    }
+  }
+}
+
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java
new file mode 100644
index 00000000000..ec36d83f250
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class MemoryBoundLinkedBlockingQueueTest
+{
+  @Test
+  public void test_offer_emptyQueueWithEnoughCapacity_true()
+  {
+
+    long byteCapacity = 10L;
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
ImmutableList.of());
+    byte[] item = "item".getBytes(StandardCharsets.UTF_8);
+
+    boolean succeeds = queue.offer(new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item, item.length));
+
+    long expectedByteSize = item.length;
+    Assert.assertTrue(succeeds);
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(expectedByteSize, queue.byteSize());
+    Assert.assertEquals(byteCapacity - item.length, queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_offer_nonEmptyQueueWithEnoughCapacity_true()
+  {
+
+    long byteCapacity = 10L;
+    byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+    byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items = 
buildItemContainers(
+        ImmutableList.of(item1)
+    );
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
items);
+
+    boolean succeeds = queue.offer(new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, item2.length));
+
+    long expectedByteSize = item1.length + item2.length;
+    Assert.assertTrue(succeeds);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(expectedByteSize, queue.byteSize());
+    Assert.assertEquals(byteCapacity - expectedByteSize, 
queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_offer_queueWithoutEnoughCapacity_false()
+  {
+
+    long byteCapacity = 7L;
+    byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+    byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items = 
buildItemContainers(
+        ImmutableList.of(item1)
+    );
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
items);
+
+    boolean succeeds = queue.offer(new 
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, item2.length));
+
+    long expectedByteSize = item1.length;
+    Assert.assertFalse(succeeds);
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(expectedByteSize, queue.byteSize());
+    Assert.assertEquals(byteCapacity - expectedByteSize, 
queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_offerWithTimeLimit_interruptedExceptinThrown_throws()
+  {
+    long byteCapacity = 10L;
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(
+        byteCapacity,
+        ImmutableList.of(),
+        new InterruptedExceptionThrowingQueue()
+    );
+    byte[] item = "item".getBytes(StandardCharsets.UTF_8);
+
+    Assert.assertThrows(
+        InterruptedException.class,
+        () -> queue.offer(
+            new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item, 
item.length),
+            1L,
+            TimeUnit.MILLISECONDS
+        )
+    );
+
+    Assert.assertEquals(0, queue.size());
+    Assert.assertEquals(0L, queue.byteSize());
+    Assert.assertEquals(byteCapacity, queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_offerWithTimeLimit_fullQueue_waitsTime() throws 
InterruptedException
+  {
+    long timeoutMillis = 2000L;
+    long byteCapacity = 10L;
+    byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+    byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items = 
buildItemContainers(
+        ImmutableList.of(item1, item2)
+    );
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(
+        byteCapacity,
+        items,
+        new InterruptedExceptionThrowingQueue()
+    );
+    byte[] item = "item".getBytes(StandardCharsets.UTF_8);
+    long start = System.currentTimeMillis();
+    boolean succeeds = queue.offer(
+        new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item, 
item.length),
+        timeoutMillis,
+        TimeUnit.MILLISECONDS
+    );
+    long end = System.currentTimeMillis();
+
+    Assert.assertFalse(succeeds);
+    Assert.assertTrue(
+        StringUtils.format(
+            "offer only waited at most [%d] nanos instead of expected [%d] 
nanos",
+            TimeUnit.MILLISECONDS.toNanos(end - start),
+            TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
+        ),
+        TimeUnit.MILLISECONDS.toNanos(end - start) >= 
TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
+    );
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(10L, queue.byteSize());
+    Assert.assertEquals(0L, queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_take_nonEmptyQueue_expected() throws InterruptedException
+  {
+
+    long byteCapacity = 10L;
+    byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+    byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+    MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> item1Container =
+        new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item1, 
item1.length);
+    MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> item2Container =
+        new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, 
item2.length);
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
ImmutableList.of());
+    Assert.assertTrue(queue.offer(item1Container));
+    Assert.assertTrue(queue.offer(item2Container));
+
+    MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> takenItem = 
queue.take();
+    long expectedByteSize = item2.length;
+
+    Assert.assertSame(item1Container, takenItem);
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(expectedByteSize, queue.byteSize());
+    Assert.assertEquals(byteCapacity - expectedByteSize, 
queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_drain_emptyQueue_succeeds() throws InterruptedException
+  {
+
+    long byteCapacity = 7L;
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
ImmutableList.of());
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer 
= new ArrayList<>();
+
+    int numAdded = queue.drain(buffer, 1, 1, TimeUnit.SECONDS);
+
+    Assert.assertTrue(numAdded == 0 && numAdded == buffer.size());
+    Assert.assertEquals(0, queue.size());
+    Assert.assertEquals(0L, queue.byteSize());
+    Assert.assertEquals(byteCapacity, queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_drain_queueWithOneItem_succeeds() throws 
InterruptedException
+  {
+
+    long byteCapacity = 7L;
+    byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items = 
buildItemContainers(
+        ImmutableList.of(item1)
+    );
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
items);
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer 
= new ArrayList<>();
+
+    int numAdded = queue.drain(buffer, 1, 1, TimeUnit.MINUTES);
+
+    Assert.assertTrue(numAdded == 1 && numAdded == buffer.size());
+    Assert.assertEquals(0, queue.size());
+    Assert.assertEquals(0L, queue.byteSize());
+    Assert.assertEquals(byteCapacity, queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_drain_queueWithMultipleItems_succeeds() throws 
InterruptedException
+  {
+
+    long byteCapacity = 15L;
+    byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+    byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+    byte[] item3 = "item3".getBytes(StandardCharsets.UTF_8);
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items = 
buildItemContainers(
+        ImmutableList.of(item1, item2, item3)
+    );
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
items, new NotAllDrainedQueue());
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer 
= new ArrayList<>();
+
+    int numAdded = queue.drain(buffer, 10, 1, TimeUnit.MINUTES);
+
+    Assert.assertTrue(numAdded == 2 && numAdded == buffer.size());
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(item3.length, queue.byteSize());
+    Assert.assertEquals(byteCapacity - item3.length, 
queue.remainingCapacity());
+  }
+
+  @Test
+  public void test_drain_queueWithFirstItemSizeGreaterThanLimit_succeeds() 
throws InterruptedException
+  {
+
+    long byteCapacity = 15L;
+    byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+    byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+    byte[] item3 = "item3".getBytes(StandardCharsets.UTF_8);
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items = 
buildItemContainers(
+        ImmutableList.of(item1, item2, item3)
+    );
+    MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity, 
items, new NotAllDrainedQueue());
+    Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer 
= new ArrayList<>();
+
+    int numAdded = queue.drain(buffer, item1.length - 1, 1, TimeUnit.MINUTES);
+
+    Assert.assertTrue(numAdded == 1 && numAdded == buffer.size());
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(item2.length + item3.length, queue.byteSize());
+    Assert.assertEquals(byteCapacity - (item2.length + item3.length), 
queue.remainingCapacity());
+  }
+
+  private static <T> MemoryBoundLinkedBlockingQueue<T> setupQueue(
+      long byteCapacity,
+      Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>> items
+  )
+  {
+    return setupQueue(byteCapacity, items, null);
+  }
+
+  private static <T> MemoryBoundLinkedBlockingQueue<T> setupQueue(
+      long byteCapacity,
+      Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>> items,
+      @Nullable 
LinkedBlockingQueue<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>> 
underlyingQueue
+  )
+  {
+    Assert.assertTrue(getTotalSizeOfItems(items) <= byteCapacity);
+    MemoryBoundLinkedBlockingQueue<T> queue = underlyingQueue != null ?
+        new MemoryBoundLinkedBlockingQueue<>(underlyingQueue, byteCapacity) : 
new MemoryBoundLinkedBlockingQueue<>(byteCapacity);
+    items.forEach(i -> Assert.assertTrue(queue.offer(i)));
+    return queue;
+  }
+
+  private static 
Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> 
buildItemContainers(
+      Collection<byte[]> items
+  )
+  {
+    return items.stream()
+        .map(i -> new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(i, 
i.length))
+        .collect(Collectors.toList());
+  }
+
+  private static <T> long 
getTotalSizeOfItems(Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>>
 items)
+  {
+    return 
items.stream().mapToLong(MemoryBoundLinkedBlockingQueue.ObjectContainer::getSize).sum();
+  }
+
+  static class InterruptedExceptionThrowingQueue
+      extends 
LinkedBlockingQueue<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>>
+  {
+    @Override
+    public boolean 
offer(MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> item, long 
timeout, TimeUnit unit)
+        throws InterruptedException
+    {
+      throw new InterruptedException("exception thrown");
+    }
+  }
+
+  static class NotAllDrainedQueue
+      extends 
LinkedBlockingQueue<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>>
+  {
+    @Override
+    public int drainTo(Collection<? super 
MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> c, int maxElements)
+    {
+      MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> firstItem = 
this.poll();
+      c.add(firstItem);
+      return 1;
+    }
+  }
+}
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx 
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index 3594f70c992..41f77593d28 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -1222,13 +1222,6 @@ export function getIoConfigTuningFormFields(
             </>
           ),
         },
-        {
-          name: 'recordsPerFetch',
-          type: 'number',
-          defaultValue: 4000,
-          defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
-          info: <>The number of records to request per GetRecords call to 
Kinesis.</>,
-        },
         {
           name: 'pollTimeout',
           type: 'number',
@@ -1249,13 +1242,6 @@ export function getIoConfigTuningFormFields(
           defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
           info: <>Time in milliseconds to wait between subsequent GetRecords 
calls to Kinesis.</>,
         },
-        {
-          name: 'deaggregate',
-          type: 'boolean',
-          defaultValue: false,
-          defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
-          info: <>Whether to use the de-aggregate function of the KCL.</>,
-        },
         {
           name: 'startDelay',
           type: 'duration',
@@ -1443,7 +1429,7 @@ export interface TuningConfig {
   offsetFetchPeriod?: string;
   maxParseExceptions?: number;
   maxSavedParseExceptions?: number;
-  recordBufferSize?: number;
+  recordBufferSizeBytes?: number;
   recordBufferOfferTimeout?: number;
   recordBufferFullWait?: number;
   fetchThreads?: number;
@@ -2055,13 +2041,13 @@ const TUNING_FORM_FIELDS: Field<IngestionSpec>[] = [
     ),
   },
   {
-    name: 'spec.tuningConfig.recordBufferSize',
+    name: 'spec.tuningConfig.recordBufferSizeBytes',
     type: 'number',
-    defaultValue: 10000,
+    defaultValue: 100000000,
     defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
     info: (
       <>
-        Size of the buffer (number of events) used between the Kinesis fetch 
threads and the main
+        Size of the buffer (heap memory bytes) used between the Kinesis fetch 
threads and the main
         ingestion thread.
       </>
     ),
@@ -2106,15 +2092,15 @@ const TUNING_FORM_FIELDS: Field<IngestionSpec>[] = [
     ),
   },
   {
-    name: 'spec.tuningConfig.maxRecordsPerPoll',
+    name: 'spec.tuningConfig.maxBytesPerPoll',
     type: 'number',
-    defaultValue: 100,
+    defaultValue: 1000000,
     defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
     hideInMore: true,
     info: (
       <>
-        The maximum number of records/events to be fetched from buffer per 
poll. The actual maximum
-        will be <Code>max(maxRecordsPerPoll, max(bufferSize, 1))</Code>.
+        The maximum number of bytes to be fetched from buffer per poll. At 
least one record will be
+        fetched regardless of config.
       </>
     ),
   },
diff --git a/website/.spelling b/website/.spelling
index 59585a52b2d..f507d8e646f 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1125,6 +1125,7 @@ 
LZ4LZFuncompressedLZ4LZ4LZFuncompressednoneLZ4autolongsautolongslongstypeconcise
 deaggregate
 druid-kinesis-indexing-service
 maxRecordsPerPoll
+maxBytesPerPoll
 
maxRecordsPerPollrecordsPerFetchfetchDelayMillisreplicasfetchDelayMillisrecordsPerFetchfetchDelayMillismaxRecordsPerPollamazon-kinesis-client1
 numKinesisShards
 numProcessors


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to