This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9d4e8053a4a Kinesis adaptive memory management (#15360)
9d4e8053a4a is described below
commit 9d4e8053a4a84e6d106afdf8b27e62d2f396ab02
Author: zachjsh <[email protected]>
AuthorDate: Fri Jan 19 14:30:21 2024 -0500
Kinesis adaptive memory management (#15360)
### Description
Our Kinesis consumer works by using the [GetRecords
API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)
in some number of `fetchThreads`, each fetching some number of records
(`recordsPerFetch`) and each inserting into a shared buffer that can hold a
`recordBufferSize` number of records. The logic is described in our
documentation at:
https://druid.apache.org/docs/27.0.0/development/extensions-core/kinesis-ingestion/#determine-fetch-settings
There is a problem with the logic that this pr fixes: the memory limits
rely on a hard-coded “estimated record size” that is `10 KB` if `deaggregate:
false` and `1 MB` if `deaggregate: true`. There have been cases where a
supervisor had `deaggregate: true` set even though it wasn’t needed, leading to
under-utilization of memory and poor ingestion performance.
Users don’t always know if their records are aggregated or not. Also, even
if they could figure it out, it’s better to not have to. So we’d like to
eliminate the `deaggregate` parameter, which means we need to do memory
management more adaptively based on the actual record sizes.
We take advantage of the fact that GetRecords doesn’t return more than 10MB
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html ):
This pr:
eliminates `recordsPerFetch`, always use the max limit of 10000 records
(the default limit if not set)
eliminate `deaggregate`, always have it true
cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`)
then we don't exceed our budget (`100MB` or `5% of heap`). In practice this
means `fetchThreads` will never be more than `10`. Tasks usually don't have
that many processors available to them anyway, so in practice I don't think
this will change the number of threads for too many deployments
add `recordBufferSizeBytes` as a bytes-based limit rather than
records-based limit for the shared queue. We do know the byte size of kinesis
records by at this point. Default should be `100MB` or `10% of heap`, whichever
is smaller.
add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from
shared buffer at a time. Default is `1000000` bytes.
deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning
is logged if `recordBufferSize` is specified
deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is
logged if maxRecordsPerPoll` is specified
Fixed issue that when the record buffer is full, the fetchRecords logic
throws away the rest of the GetRecords result after `recordBufferOfferTimeout`
and starts a new shard iterator. This seems excessively churny. Instead, wait
an unbounded amount of time for queue to stop being full. If the queue remains
full, we’ll end up right back waiting for it after the restarted fetch.
There was also a call to `newQ::offer` without check in
`filterBufferAndResetBackgroundFetch`, which seemed like it could cause data
loss. Now checking return value here, and failing if false.
### Release Note
Kinesis ingestion memory tuning config has been greatly simplified, and a
more adaptive approach is now taken for the configuration. Here is a summary of
the changes made:
eliminates `recordsPerFetch`, always use the max limit of 10000 records
(the default limit if not set)
eliminate `deaggregate`, always have it true
cap `fetchThreads` to ensure that if each fetch returns the max (`10MB`)
then we don't exceed our budget (`100MB` or `5% of heap`). In practice this
means `fetchThreads` will never be more than `10`. Tasks usually don't have
that many processors available to them anyway, so in practice I don't think
this will change the number of threads for too many deployments
add `recordBufferSizeBytes` as a bytes-based limit rather than
records-based limit for the shared queue. We do know the byte size of kinesis
records by at this point. Default should be `100MB` or `10% of heap`, whichever
is smaller.
add `maxBytesPerPoll` as a bytes-based limit for how much data we poll from
shared buffer at a time. Default is `1000000` bytes.
deprecate `recordBufferSize`, use `recordBufferSizeBytes` instead. Warning
is logged if `recordBufferSize` is specified
deprecate `maxRecordsPerPoll`, use `maxBytesPerPoll` instead. Warning is
logged if maxRecordsPerPoll` is specified
---
.../extensions-core/kinesis-ingestion.md | 27 +-
.../apache/druid/emitter/kafka/KafkaEmitter.java | 6 +-
.../kafka/MemoryBoundLinkedBlockingQueue.java | 85 ------
.../druid/indexing/kinesis/KinesisIndexTask.java | 69 +++--
.../indexing/kinesis/KinesisIndexTaskIOConfig.java | 56 +---
.../kinesis/KinesisIndexTaskTuningConfig.java | 63 ++--
.../indexing/kinesis/KinesisRecordSupplier.java | 168 ++++++-----
.../druid/indexing/kinesis/KinesisSamplerSpec.java | 6 +-
.../kinesis/supervisor/KinesisSupervisor.java | 10 +-
.../supervisor/KinesisSupervisorIOConfig.java | 4 +-
.../supervisor/KinesisSupervisorTuningConfig.java | 15 +-
.../indexing/kinesis/KinesisIOConfigTest.java | 42 +--
.../kinesis/KinesisIndexTaskSerdeTest.java | 6 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 33 +--
.../kinesis/KinesisIndexTaskTuningConfigTest.java | 24 +-
.../kinesis/KinesisRecordSupplierTest.java | 212 ++++----------
.../supervisor/KinesisSupervisorIOConfigTest.java | 8 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 106 ++++---
.../TestModifiedKinesisIndexTaskTuningConfig.java | 6 +
.../common/MemoryBoundLinkedBlockingQueue.java | 211 ++++++++++++++
.../common/MemoryBoundLinkedBlockingQueueTest.java | 319 +++++++++++++++++++++
.../druid-models/ingestion-spec/ingestion-spec.tsx | 30 +-
website/.spelling | 1 +
23 files changed, 907 insertions(+), 600 deletions(-)
diff --git a/docs/development/extensions-core/kinesis-ingestion.md
b/docs/development/extensions-core/kinesis-ingestion.md
index 5071b153366..7d0709d99cf 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -241,11 +241,9 @@ The following table outlines the configuration options for
`ioConfig`:
|`completionTimeout`|ISO 8601 period|The length of time to wait before Druid
declares a publishing task has failed and terminates it. If this is set too
low, your tasks may never publish. The publishing clock for a task begins
roughly after `taskDuration` elapses.|No|PT6H|
|`lateMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject
messages with timestamps earlier than this period before the task is created.
For example, if `lateMessageRejectionPeriod` is set to `PT1H` and the
supervisor creates a task at `2016-01-01T12:00Z`, messages with timestamps
earlier than `2016-01-01T11:00Z` are dropped. This may help prevent concurrency
issues if your data stream has late messages and you have multiple pipelines
that need to operate on the same segment [...]
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
`taskDuration`. For example, if `earlyMessageRejectionPeriod` is set to `PT1H`,
the `taskDuration` is set to `PT1H` and the supervisor creates a task at
`2016-01-01T12:00Z`. Messages with timestamps later than `2016-01-01T14:00Z`
are dropped. **Note:** Tasks sometimes run past their task duration, for
example, in cases of supervisor failover. [...]
-|`recordsPerFetch`|Integer|The number of records to request per call to fetch
records from Kinesis.|No| See [Determine fetch
settings](#determine-fetch-settings) for defaults.|
|`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent
calls to fetch records from Kinesis. See [Determine fetch
settings](#determine-fetch-settings).|No|0|
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional
permissions.|No||
|`awsExternalId`|String|The AWS external ID to use for additional
permissions.|No||
-|`deaggregate`|Boolean|Whether to use the deaggregate function of the Kinesis
Client Library (KCL).|No||
|`autoScalerConfig`|Object|Defines autoscaling behavior for Kinesis ingest
tasks. See [Task autoscaler properties](#task-autoscaler-properties) for more
information.|No|null|
### Task autoscaler properties
@@ -406,7 +404,7 @@ The following table outlines the configuration options for
`tuningConfig`:
|`chatRetries`|Integer|The number of times Druid retries HTTP requests to
indexing tasks before considering tasks unresponsive.|No|8|
|`httpTimeout`|ISO 8601 period|The period of time to wait for a HTTP response
from an indexing task.|No|PT10S|
|`shutdownTimeout`|ISO 8601 period|The period of time to wait for the
supervisor to attempt a graceful shutdown of tasks before exiting.|No|PT80S|
-|`recordBufferSize`|Integer|The size of the buffer (number of events) Druid
uses between the Kinesis fetch threads and the main ingestion thread.|No|See
[Determine fetch settings](#determine-fetch-settings) for defaults.|
+|`recordBufferSizeBytes`|Integer| The size of the buffer (heap memory bytes)
Druid uses between the Kinesis fetch threads and the main ingestion thread.|No|
See [Determine fetch settings](#determine-fetch-settings) for defaults.|
|`recordBufferOfferTimeout`|Integer|The number of milliseconds to wait for
space to become available in the buffer before timing out.|No|5000|
|`recordBufferFullWait`|Integer|The number of milliseconds to wait for the
buffer to drain before Druid attempts to fetch records from Kinesis
again.|No|5000|
|`fetchThreads`|Integer|The size of the pool of threads fetching data from
Kinesis. There is no benefit in having more threads than Kinesis shards.|No|
`procs * 2`, where `procs` is the number of processors available to the task.|
@@ -414,7 +412,7 @@ The following table outlines the configuration options for
`tuningConfig`:
|`logParseExceptions`|Boolean|If `true`, Druid logs an error message when a
parsing exception occurs, containing information about the row where the error
occurred.|No|`false`|
|`maxParseExceptions`|Integer|The maximum number of parse exceptions that can
occur before the task halts ingestion and fails. Overridden if
`reportParseExceptions` is set.|No|unlimited|
|`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid keeps
track of the most recent parse exceptions. `maxSavedParseExceptions` limits the
number of saved exception instances. These saved exceptions are available after
the task finishes in the [task completion
report](../../ingestion/tasks.md#task-reports). Overridden if
`reportParseExceptions` is set.|No|0|
-|`maxRecordsPerPoll`|Integer|The maximum number of records to be fetched from
buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll,
Max(bufferSize, 1))`.|No| See [Determine fetch
settings](#determine-fetch-settings) for defaults.|
+|`maxBytesPerPoll`|Integer| The maximum number of bytes to be fetched from
buffer per poll. At least one record is polled from the buffer regardless of
this config.|No| 1000000 bytes|
|`repartitionTransitionDuration`|ISO 8601 period|When shards are split or
merged, the supervisor recomputes shard to task group mappings. The supervisor
also signals any running tasks created under the old mappings to stop early at
current time + `repartitionTransitionDuration`. Stopping the tasks early allows
Druid to begin reading from the new shards more quickly. The repartition
transition wait time controlled by this property gives the stream additional
time to write records to the n [...]
|`offsetFetchPeriod`|ISO 8601 period|Determines how often the supervisor
queries Kinesis and the indexing tasks to fetch current offsets and calculate
lag. If the user-specified value is below the minimum value of PT5S, the
supervisor ignores the value and uses the minimum value instead.|No|PT30S|
|`useListShards`|Boolean|Indicates if `listShards` API of AWS Kinesis SDK can
be used to prevent `LimitExceededException` during ingestion. You must set the
necessary `IAM` permissions.|No|`false`|
@@ -656,25 +654,22 @@ For more detail, see [Segment size
optimization](../../operations/segment-optimi
Kinesis indexing tasks fetch records using `fetchThreads` threads.
If `fetchThreads` is higher than the number of Kinesis shards, the excess
threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a
Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard,
with a delay between fetches
of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size
`recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size
`recordBufferSizeBytes`.
The main runner thread for each task polls up to `maxRecordsPerPoll` records
from the queue at once.
-When using Kinesis Producer Library's aggregation feature, that is when
[`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual
records.
-
The default values for these parameters are:
- `fetchThreads`: Twice the number of processors available to the task. The
number of processors available to the task
is the total number of processors on the server, divided by
`druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data
record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB
of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a
warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
- `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is
smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and
1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever
is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and
1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated
records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap,
whichever is smaller.
+- `maxBytesPerPoll`: 1000000.
Kinesis places the following restrictions on calls to fetch records:
@@ -697,8 +692,6 @@ Kinesis stream.
The Kinesis indexing service supports de-aggregation of multiple rows packed
into a single record by the Kinesis
Producer Library's aggregate method for more efficient data transfer.
-To enable this feature, set `deaggregate` to true in your `ioConfig` when
submitting a supervisor spec.
-
## Resharding
[Resharding](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html)
is an advanced operation that lets you adjust the number of shards in a stream
to adapt to changes in the rate of data flowing through a stream.
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 87183d62fe7..661543f707c 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
-import
org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
+import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
@@ -176,7 +176,7 @@ public class KafkaEmitter implements Emitter
private void sendToKafka(final String topic,
MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
- ObjectContainer<String> objectToSend;
+ MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectToSend;
try {
while (true) {
objectToSend = recordQueue.take();
@@ -206,7 +206,7 @@ public class KafkaEmitter implements Emitter
String resultJson = jsonMapper.writeValueAsString(map);
- ObjectContainer<String> objectContainer = new ObjectContainer<>(
+ MemoryBoundLinkedBlockingQueue.ObjectContainer<String> objectContainer
= new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(
resultJson,
StringUtils.toUtf8(resultJson).length
);
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java
deleted file mode 100644
index fb6cae8ee95..00000000000
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.emitter.kafka;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Similar to LinkedBlockingQueue but can be bounded by the total byte size of
the items present in the queue
- * rather than number of items.
- */
-public class MemoryBoundLinkedBlockingQueue<T>
-{
- private final long memoryBound;
- private final AtomicLong currentMemory;
- private final LinkedBlockingQueue<ObjectContainer<T>> queue;
-
- public MemoryBoundLinkedBlockingQueue(long memoryBound)
- {
- this.memoryBound = memoryBound;
- this.currentMemory = new AtomicLong(0L);
- this.queue = new LinkedBlockingQueue<>();
- }
-
- // returns true/false depending on whether item was added or not
- public boolean offer(ObjectContainer<T> item)
- {
- final long itemLength = item.getSize();
-
- if (currentMemory.addAndGet(itemLength) <= memoryBound) {
- if (queue.offer(item)) {
- return true;
- }
- }
- currentMemory.addAndGet(-itemLength);
- return false;
- }
-
- // blocks until at least one item is available to take
- public ObjectContainer<T> take() throws InterruptedException
- {
- final ObjectContainer<T> ret = queue.take();
- currentMemory.addAndGet(-ret.getSize());
- return ret;
- }
-
- public static class ObjectContainer<T>
- {
- private T data;
- private long size;
-
- ObjectContainer(T data, long size)
- {
- this.data = data;
- this.size = size;
- }
-
- public T getData()
- {
- return data;
- }
-
- public long getSize()
- {
- return size;
- }
- }
-}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 1aa6d5b2e3b..fb019f10030 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -49,6 +49,10 @@ import java.util.Set;
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String,
ByteEntity>
{
private static final String TYPE = "index_kinesis";
+
+ // GetRecords returns maximum 10MB per call
+ //
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
+ private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L;
private static final Logger log = new Logger(KinesisIndexTask.class);
private final boolean useListShards;
@@ -84,6 +88,14 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, By
public TaskStatus runTask(TaskToolbox toolbox)
{
this.runtimeInfo = toolbox.getAdjustedRuntimeInfo();
+ if (getTuningConfig().getRecordBufferSizeConfigured() != null) {
+ log.warn("The 'recordBufferSize' config property of the kinesis tuning
config has been deprecated. "
+ + "Please use 'recordBufferSizeBytes'.");
+ }
+ if (getTuningConfig().getMaxRecordsPerPollConfigured() != null) {
+ log.warn("The 'maxRecordsPerPoll' config property of the kinesis tuning
config has been deprecated. "
+ + "Please use 'maxBytesPerPoll'.");
+ }
return super.runTask(toolbox);
}
@@ -105,21 +117,18 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, By
{
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig)
super.ioConfig);
KinesisIndexTaskTuningConfig tuningConfig =
((KinesisIndexTaskTuningConfig) super.tuningConfig);
+ final int recordBufferSizeBytes =
+
tuningConfig.getRecordBufferSizeBytesOrDefault(runtimeInfo.getMaxHeapSizeBytes());
final int fetchThreads = computeFetchThreads(runtimeInfo,
tuningConfig.getFetchThreads());
- final int recordsPerFetch =
ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(),
fetchThreads);
- final int recordBufferSize =
-
tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(),
ioConfig.isDeaggregate());
- final int maxRecordsPerPoll =
tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate());
+ final int maxBytesPerPoll = tuningConfig.getMaxBytesPerPollOrDefault();
log.info(
- "Starting record supplier with fetchThreads [%d], fetchDelayMillis
[%d], recordsPerFetch [%d], "
- + "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].",
+ "Starting record supplier with fetchThreads [%d], fetchDelayMillis
[%d], "
+ + "recordBufferSizeBytes [%d], maxBytesPerPoll [%d]",
fetchThreads,
ioConfig.getFetchDelayMillis(),
- recordsPerFetch,
- recordBufferSize,
- maxRecordsPerPoll,
- ioConfig.isDeaggregate()
+ recordBufferSizeBytes,
+ maxBytesPerPoll
);
return new KinesisRecordSupplier(
@@ -129,19 +138,24 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, By
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
- recordsPerFetch,
ioConfig.getFetchDelayMillis(),
fetchThreads,
- ioConfig.isDeaggregate(),
- recordBufferSize,
+ recordBufferSizeBytes,
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
- maxRecordsPerPoll,
+ maxBytesPerPoll,
false,
useListShards
);
}
+ @Override
+ @JsonProperty
+ public KinesisIndexTaskTuningConfig getTuningConfig()
+ {
+ return (KinesisIndexTaskTuningConfig) super.getTuningConfig();
+ }
+
@Override
@JsonProperty("ioConfig")
public KinesisIndexTaskIOConfig getIOConfig()
@@ -179,15 +193,38 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, By
}
@VisibleForTesting
- static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer
configuredFetchThreads)
+ static int computeFetchThreads(
+ final RuntimeInfo runtimeInfo,
+ final Integer configuredFetchThreads
+ )
{
- final int fetchThreads;
+ int fetchThreads;
if (configuredFetchThreads != null) {
fetchThreads = configuredFetchThreads;
} else {
fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
}
+ // Each fetchThread can return upto 10MB at a time
+ //
(https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html),
cap fetchThreads so that
+ // we don't exceed more than the least of 100MB or 5% of heap at a time.
Don't fail if fetchThreads specified
+ // is greater than this as to not cause failure for older configurations,
but log warning in this case, and lower
+ // fetchThreads implicitly.
+ final long memoryToUse = Math.min(
+ KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+ (long) (runtimeInfo.getMaxHeapSizeBytes() *
KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+ );
+ int maxFetchThreads = Math.max(
+ 1,
+ (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
+ );
+ if (fetchThreads > maxFetchThreads) {
+ if (configuredFetchThreads != null) {
+ log.warn("fetchThreads [%d] being lowered to [%d]",
configuredFetchThreads, maxFetchThreads);
+ }
+ fetchThreads = maxFetchThreads;
+ }
+
Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record
supplier"
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
index 0572c317006..881d68ba896 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
import org.apache.druid.data.input.InputFormat;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
@@ -41,21 +40,19 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
* Together with {@link
KinesisIndexTaskTuningConfig#MAX_RECORD_BUFFER_MEMORY}, don't take up more than
200MB
* per task.
*/
- private static final int MAX_RECORD_FETCH_MEMORY = 100_000_000;
+ public static final int MAX_RECORD_FETCH_MEMORY = 100_000_000;
/**
* Together with {@link
KinesisIndexTaskTuningConfig#RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION}, don't
take up more
* than 15% of the heap.
*/
- private static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05;
+ public static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05;
private final String endpoint;
- private final Integer recordsPerFetch;
private final int fetchDelayMillis;
private final String awsAssumedRoleArn;
private final String awsExternalId;
- private final boolean deaggregate;
@JsonCreator
public KinesisIndexTaskIOConfig(
@@ -79,11 +76,9 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("endpoint") String endpoint,
- @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
- @JsonProperty("awsExternalId") String awsExternalId,
- @JsonProperty("deaggregate") boolean deaggregate
+ @JsonProperty("awsExternalId") String awsExternalId
)
{
super(
@@ -105,11 +100,9 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
);
this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint");
- this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis != null ? fetchDelayMillis :
DEFAULT_FETCH_DELAY_MILLIS;
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
- this.deaggregate = deaggregate;
}
public KinesisIndexTaskIOConfig(
@@ -122,11 +115,9 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
DateTime maximumMessageTime,
InputFormat inputFormat,
String endpoint,
- Integer recordsPerFetch,
Integer fetchDelayMillis,
String awsAssumedRoleArn,
- String awsExternalId,
- boolean deaggregate
+ String awsExternalId
)
{
this(
@@ -142,11 +133,9 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
maximumMessageTime,
inputFormat,
endpoint,
- recordsPerFetch,
fetchDelayMillis,
awsAssumedRoleArn,
- awsExternalId,
- deaggregate
+ awsExternalId
);
}
@@ -215,32 +204,6 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
return endpoint;
}
- @Nullable
- @JsonProperty("recordsPerFetch")
- @JsonInclude(JsonInclude.Include.NON_NULL)
- public Integer getRecordsPerFetchConfigured()
- {
- return recordsPerFetch;
- }
-
- public int getRecordsPerFetchOrDefault(final long maxHeapSize, final int
fetchThreads)
- {
- if (recordsPerFetch != null) {
- return recordsPerFetch;
- } else {
- final long memoryToUse = Math.min(
- MAX_RECORD_FETCH_MEMORY,
- (long) (maxHeapSize * RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
- );
-
- final int assumedRecordSize = deaggregate
- ?
KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE_AGGREGATE
- :
KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE;
-
- return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize /
fetchThreads));
- }
- }
-
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getFetchDelayMillis()
@@ -262,13 +225,6 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
return awsExternalId;
}
- @JsonProperty
- @JsonInclude(JsonInclude.Include.NON_DEFAULT)
- public boolean isDeaggregate()
- {
- return deaggregate;
- }
-
@Override
public String toString()
{
@@ -280,11 +236,9 @@ public class KinesisIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<St
", minimumMessageTime=" + getMinimumMessageTime() +
", maximumMessageTime=" + getMaximumMessageTime() +
", endpoint='" + endpoint + '\'' +
- ", recordsPerFetch=" + recordsPerFetch +
", fetchDelayMillis=" + fetchDelayMillis +
", awsAssumedRoleArn='" + awsAssumedRoleArn + '\'' +
", awsExternalId='" + awsExternalId + '\'' +
- ", deaggregate=" + deaggregate +
'}';
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 06d8cf53723..0c3cd717752 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -57,13 +56,14 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
private static final int DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT = 5000;
private static final int DEFAULT_RECORD_BUFFER_FULL_WAIT = 5000;
private static final int DEFAULT_MAX_RECORDS_PER_POLL = 100;
- private static final int DEFAULT_MAX_RECORDS_PER_POLL_AGGREGATE = 1;
-
+ private static final int DEFAULT_MAX_BYTES_PER_POLL = 1_000_000;
private final Integer recordBufferSize;
+ private final Integer recordBufferSizeBytes;
private final int recordBufferOfferTimeout;
private final int recordBufferFullWait;
private final Integer fetchThreads;
private final Integer maxRecordsPerPoll;
+ private final Integer maxBytesPerPoll;
public KinesisIndexTaskTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@@ -81,7 +81,8 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
Long handoffConditionTimeout,
Boolean resetOffsetAutomatically,
Boolean skipSequenceNumberAvailabilityCheck,
- Integer recordBufferSize,
+ @Deprecated @Nullable Integer recordBufferSize,
+ @Nullable Integer recordBufferSizeBytes,
Integer recordBufferOfferTimeout,
Integer recordBufferFullWait,
Integer fetchThreads,
@@ -89,7 +90,8 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
@Nullable Boolean logParseExceptions,
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions,
- @Nullable Integer maxRecordsPerPoll,
+ @Deprecated @Nullable Integer maxRecordsPerPoll,
+ @Nullable Integer maxBytesPerPoll,
@Nullable Period intermediateHandoffPeriod
)
{
@@ -116,12 +118,14 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
maxSavedParseExceptions
);
this.recordBufferSize = recordBufferSize;
+ this.recordBufferSizeBytes = recordBufferSizeBytes;
this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
: recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait == null ?
DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
this.fetchThreads = fetchThreads; // we handle this being null later
this.maxRecordsPerPoll = maxRecordsPerPoll;
+ this.maxBytesPerPoll = maxBytesPerPoll;
Preconditions.checkArgument(
!(super.isResetOffsetAutomatically() &&
super.isSkipSequenceNumberAvailabilityCheck()),
@@ -145,7 +149,8 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean
resetOffsetAutomatically,
@JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean
skipSequenceNumberAvailabilityCheck,
- @JsonProperty("recordBufferSize") Integer recordBufferSize,
+ @JsonProperty("recordBufferSize") @Deprecated @Nullable Integer
recordBufferSize,
+ @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes,
@JsonProperty("recordBufferOfferTimeout") Integer
recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
@JsonProperty("fetchThreads") Integer fetchThreads,
@@ -153,7 +158,8 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions,
- @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+ @JsonProperty("maxRecordsPerPoll") @Deprecated @Nullable Integer
maxRecordsPerPoll,
+ @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll,
@JsonProperty("intermediateHandoffPeriod") @Nullable Period
intermediateHandoffPeriod
)
{
@@ -174,6 +180,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
resetOffsetAutomatically,
skipSequenceNumberAvailabilityCheck,
recordBufferSize,
+ recordBufferSizeBytes,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchThreads,
@@ -182,6 +189,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
maxParseExceptions,
maxSavedParseExceptions,
maxRecordsPerPoll,
+ maxBytesPerPoll,
intermediateHandoffPeriod
);
}
@@ -194,18 +202,23 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
return recordBufferSize;
}
- public int getRecordBufferSizeOrDefault(final long maxHeapSize, final
boolean deaggregate)
+ @Nullable
+ @JsonProperty("recordBufferSizeBytes")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Integer getRecordBufferSizeBytesConfigured()
{
- if (recordBufferSize != null) {
- return recordBufferSize;
+ return recordBufferSizeBytes;
+ }
+
+ public int getRecordBufferSizeBytesOrDefault(final long maxHeapSize)
+ {
+ if (recordBufferSizeBytes != null) {
+ return recordBufferSizeBytes;
} else {
- final long memoryToUse = Math.min(
+ return (int) Math.min(
MAX_RECORD_BUFFER_MEMORY,
(long) (maxHeapSize * RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION)
);
-
- final int assumedRecordSize = deaggregate ?
ASSUMED_RECORD_SIZE_AGGREGATE : ASSUMED_RECORD_SIZE;
- return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize));
}
}
@@ -237,9 +250,17 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
return maxRecordsPerPoll;
}
- public int getMaxRecordsPerPollOrDefault(final boolean deaggregate)
+ @Nullable
+ @JsonProperty("maxBytesPerPoll")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Integer getMaxBytesPerPollConfigured()
+ {
+ return maxBytesPerPoll;
+ }
+
+ public int getMaxBytesPerPollOrDefault()
{
- return deaggregate ? DEFAULT_MAX_RECORDS_PER_POLL_AGGREGATE :
DEFAULT_MAX_RECORDS_PER_POLL;
+ return maxBytesPerPoll != null ? maxBytesPerPoll :
DEFAULT_MAX_BYTES_PER_POLL;
}
@Override
@@ -262,6 +283,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
isResetOffsetAutomatically(),
isSkipSequenceNumberAvailabilityCheck(),
getRecordBufferSizeConfigured(),
+ getRecordBufferSizeBytesConfigured(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
getFetchThreads(),
@@ -270,6 +292,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getMaxRecordsPerPollConfigured(),
+ getMaxBytesPerPollConfigured(),
getIntermediateHandoffPeriod()
);
}
@@ -288,9 +311,11 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
}
KinesisIndexTaskTuningConfig that = (KinesisIndexTaskTuningConfig) o;
return Objects.equals(recordBufferSize, that.recordBufferSize) &&
+ Objects.equals(recordBufferSizeBytes, that.recordBufferSizeBytes) &&
recordBufferOfferTimeout == that.recordBufferOfferTimeout &&
recordBufferFullWait == that.recordBufferFullWait &&
Objects.equals(maxRecordsPerPoll, that.maxRecordsPerPoll) &&
+ Objects.equals(maxBytesPerPoll, that.maxBytesPerPoll) &&
Objects.equals(fetchThreads, that.fetchThreads);
}
@@ -300,10 +325,12 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
return Objects.hash(
super.hashCode(),
recordBufferSize,
+ recordBufferSizeBytes,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchThreads,
- maxRecordsPerPoll
+ maxRecordsPerPoll,
+ maxBytesPerPoll
);
}
@@ -324,6 +351,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", skipSequenceNumberAvailabilityCheck=" +
isSkipSequenceNumberAvailabilityCheck() +
", recordBufferSize=" + recordBufferSize +
+ ", recordBufferSizeBytes=" + recordBufferSizeBytes +
", recordBufferOfferTimeout=" + recordBufferOfferTimeout +
", recordBufferFullWait=" + recordBufferFullWait +
", fetchThreads=" + fetchThreads +
@@ -332,6 +360,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", maxRecordsPerPoll=" + maxRecordsPerPoll +
+ ", maxBytesPerPoll=" + maxBytesPerPoll +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
'}';
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index f0645f8f82c..36047ce429d 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -46,11 +46,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSClientUtil;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -58,6 +58,7 @@ import
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -78,12 +79,10 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -212,7 +211,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
// used for retrying on InterruptedException
GetRecordsResult recordsResult = null;
OrderedPartitionableRecord<String, String, ByteEntity> currRecord;
-
+ long recordBufferOfferWaitMillis;
try {
if (shardIterator == null) {
@@ -228,45 +227,47 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
recordsResult = null;
- if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
+ recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+ while (!records.offer(
+ new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, 0),
+ recordBufferOfferWaitMillis,
+ TimeUnit.MILLISECONDS
+ )) {
log.warn("Kinesis records are being processed slower than they
are fetched. "
+ "OrderedPartitionableRecord buffer full, retrying in
[%,dms].",
recordBufferFullWait);
- scheduleBackgroundFetch(recordBufferFullWait);
+ recordBufferOfferWaitMillis = recordBufferFullWait;
}
return;
}
- recordsResult = kinesis.getRecords(new
GetRecordsRequest().withShardIterator(
- shardIterator).withLimit(recordsPerFetch));
+ recordsResult = kinesis.getRecords(new
GetRecordsRequest().withShardIterator(shardIterator));
currentLagMillis = recordsResult.getMillisBehindLatest();
// list will come back empty if there are no records
for (Record kinesisRecord : recordsResult.getRecords()) {
-
final List<ByteEntity> data;
+ if (deaggregateHandle == null || getDataHandle == null) {
+ throw new ISE("deaggregateHandle or getDataHandle is null!");
+ }
- if (deaggregate) {
- if (deaggregateHandle == null || getDataHandle == null) {
- throw new ISE("deaggregateHandle or getDataHandle is null!");
- }
-
- data = new ArrayList<>();
+ data = new ArrayList<>();
- final List userRecords = (List) deaggregateHandle.invokeExact(
- Collections.singletonList(kinesisRecord)
- );
+ final List userRecords = (List) deaggregateHandle.invokeExact(
+ Collections.singletonList(kinesisRecord)
+ );
- for (Object userRecord : userRecords) {
- data.add(new ByteEntity((ByteBuffer)
getDataHandle.invoke(userRecord)));
- }
- } else {
- data = Collections.singletonList(new
ByteEntity(kinesisRecord.getData()));
+ int recordSize = 0;
+ for (Object userRecord : userRecords) {
+ ByteEntity byteEntity = new ByteEntity((ByteBuffer)
getDataHandle.invoke(userRecord));
+ recordSize += byteEntity.getBuffer().array().length;
+ data.add(byteEntity);
}
+
currRecord = new OrderedPartitionableRecord<>(
streamPartition.getStream(),
streamPartition.getPartitionId(),
@@ -277,10 +278,11 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
if (log.isTraceEnabled()) {
log.trace(
- "Stream[%s] / partition[%s] / sequenceNum[%s] /
bufferRemainingCapacity[%d]: %s",
+ "Stream[%s] / partition[%s] / sequenceNum[%s] /
bufferByteSize[%d] / bufferRemainingByteCapacity[%d]: %s",
currRecord.getStream(),
currRecord.getPartitionId(),
currRecord.getSequenceNumber(),
+ records.byteSize(),
records.remainingCapacity(),
currRecord.getData()
.stream()
@@ -292,24 +294,18 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
);
}
- // If the buffer was full and we weren't able to add the message,
grab a new stream iterator starting
- // from this message and back off for a bit to let the buffer
drain before retrying.
- if (!records.offer(currRecord, recordBufferOfferTimeout,
TimeUnit.MILLISECONDS)) {
+ recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+ while (!records.offer(
+ new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+ recordBufferOfferWaitMillis,
+ TimeUnit.MILLISECONDS
+ )) {
log.warn(
"Kinesis records are being processed slower than they are
fetched. "
+ "OrderedPartitionableRecord buffer full, storing iterator
and retrying in [%,dms].",
recordBufferFullWait
);
-
- shardIterator = kinesis.getShardIterator(
- currRecord.getStream(),
- currRecord.getPartitionId(),
- ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
- currRecord.getSequenceNumber()
- ).getShardIterator();
-
- scheduleBackgroundFetch(recordBufferFullWait);
- return;
+ recordBufferOfferWaitMillis = recordBufferFullWait;
}
}
@@ -399,15 +395,12 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
private final MethodHandle getDataHandle;
private final AmazonKinesis kinesis;
-
- private final int recordsPerFetch;
private final int fetchDelayMillis;
- private final boolean deaggregate;
private final int recordBufferOfferTimeout;
private final int recordBufferFullWait;
- private final int maxRecordsPerPoll;
+ private final int maxBytesPerPoll;
private final int fetchThreads;
- private final int recordBufferSize;
+ private final int recordBufferSizeBytes;
private final boolean useEarliestSequenceNumber;
private final boolean useListShards;
@@ -415,7 +408,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
private final ConcurrentMap<StreamPartition<String>, PartitionResource>
partitionResources =
new ConcurrentHashMap<>();
- private BlockingQueue<OrderedPartitionableRecord<String, String,
ByteEntity>> records;
+ private MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String,
String, ByteEntity>> records;
private final boolean backgroundFetchEnabled;
private volatile boolean closed = false;
@@ -423,56 +416,48 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
public KinesisRecordSupplier(
AmazonKinesis amazonKinesis,
- int recordsPerFetch,
int fetchDelayMillis,
int fetchThreads,
- boolean deaggregate,
- int recordBufferSize,
+ int recordBufferSizeBytes,
int recordBufferOfferTimeout,
int recordBufferFullWait,
- int maxRecordsPerPoll,
+ int maxBytesPerPoll,
boolean useEarliestSequenceNumber,
boolean useListShards
)
{
Preconditions.checkNotNull(amazonKinesis);
this.kinesis = amazonKinesis;
- this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis;
- this.deaggregate = deaggregate;
this.recordBufferOfferTimeout = recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait;
- this.maxRecordsPerPoll = maxRecordsPerPoll;
+ this.maxBytesPerPoll = maxBytesPerPoll;
this.fetchThreads = fetchThreads;
- this.recordBufferSize = recordBufferSize;
+ this.recordBufferSizeBytes = recordBufferSizeBytes;
this.useEarliestSequenceNumber = useEarliestSequenceNumber;
this.useListShards = useListShards;
this.backgroundFetchEnabled = fetchThreads > 0;
- // the deaggregate function is implemented by the amazon-kinesis-client,
whose license is not compatible with Apache.
- // The work around here is to use reflection to find the deaggregate
function in the classpath. See details on the
- // docs page for more information on how to use deaggregation
- if (deaggregate) {
- try {
- Class<?> kclUserRecordclass =
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
- MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+ // The deaggregate function is implemented by the amazon-kinesis-client,
whose license was formerly not compatible
+ // with Apache. The code here avoids the license issue by using
reflection, but is no longer necessary since
+ // amazon-kinesis-client is now Apache-licensed and is now a dependency of
Druid. This code could safely be
+ // modified to use regular calls rather than reflection.
+ try {
+ Class<?> kclUserRecordclass =
Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
+ MethodHandles.Lookup lookup = MethodHandles.publicLookup();
- Method deaggregateMethod = kclUserRecordclass.getMethod("deaggregate",
List.class);
- Method getDataMethod = kclUserRecordclass.getMethod("getData");
+ Method deaggregateMethod = kclUserRecordclass.getMethod("deaggregate",
List.class);
+ Method getDataMethod = kclUserRecordclass.getMethod("getData");
- deaggregateHandle = lookup.unreflect(deaggregateMethod);
- getDataHandle = lookup.unreflect(getDataMethod);
- }
- catch (ClassNotFoundException e) {
- throw new ISE(e, "cannot find
class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], "
- + "note that when using deaggregate=true, you must
provide the Kinesis Client Library jar in the classpath");
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- } else {
- deaggregateHandle = null;
- getDataHandle = null;
+ deaggregateHandle = lookup.unreflect(deaggregateMethod);
+ getDataHandle = lookup.unreflect(getDataMethod);
+ }
+ catch (ClassNotFoundException e) {
+ throw new ISE(e, "cannot find
class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], "
+ + "note that when using deaggregate=true, you must
provide the Kinesis Client Library jar in the classpath");
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
}
if (backgroundFetchEnabled) {
@@ -488,7 +473,7 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
);
}
- records = new LinkedBlockingQueue<>(recordBufferSize);
+ records = new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
}
public static AmazonKinesis getAmazonKinesisClient(
@@ -635,23 +620,19 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
start();
try {
- int expectedSize = Math.min(Math.max(records.size(), 1),
maxRecordsPerPoll);
-
- List<OrderedPartitionableRecord<String, String, ByteEntity>>
polledRecords = new ArrayList<>(expectedSize);
+
List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String,
String, ByteEntity>>> polledRecords = new ArrayList<>();
- Queues.drain(
- records,
+ records.drain(
polledRecords,
- expectedSize,
+ maxBytesPerPoll,
timeout,
TimeUnit.MILLISECONDS
);
- polledRecords = polledRecords.stream()
- .filter(x ->
partitionResources.containsKey(x.getStreamPartition()))
- .collect(Collectors.toList());
-
- return polledRecords;
+ return polledRecords.stream()
+ .filter(x ->
partitionResources.containsKey(x.getData().getStreamPartition()))
+ .map(MemoryBoundLinkedBlockingQueue.ObjectContainer::getData)
+ .collect(Collectors.toList());
}
catch (InterruptedException e) {
log.warn(e, "Interrupted while polling");
@@ -1059,11 +1040,22 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
}
// filter records in buffer and only retain ones whose partition was not
seeked
- BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ
= new LinkedBlockingQueue<>(recordBufferSize);
+ MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String,
ByteEntity>> newQ =
+ new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
records.stream()
- .filter(x -> !partitions.contains(x.getStreamPartition()))
- .forEachOrdered(newQ::offer);
+ .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+ .forEachOrdered(x -> {
+ if (!newQ.offer(x)) {
+ // this should never really happen in practice but adding check
here for safety.
+ throw DruidException.defensive("Failed to insert item to new queue
when resetting background fetch. "
+ + "[stream: '%s', partitionId: '%s', sequenceNumber: '%s']",
+ x.getData().getStream(),
+ x.getData().getPartitionId(),
+ x.getData().getSequenceNumber()
+ );
+ }
+ });
records = newQ;
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
index 29e909eb0bb..81f8b774f04 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -73,14 +73,12 @@ public class KinesisSamplerSpec extends
SeekableStreamSamplerSpec
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
- ioConfig.getRecordsPerFetch() != null ? ioConfig.getRecordsPerFetch()
: DEFAULT_RECORDS_PER_FETCH,
ioConfig.getFetchDelayMillis(),
1,
- ioConfig.isDeaggregate(),
-
tuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(),
ioConfig.isDeaggregate()),
+
tuningConfig.getRecordBufferSizeBytesOrDefault(Runtime.getRuntime().maxMemory()),
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
- tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()),
+ tuningConfig.getMaxBytesPerPollOrDefault(),
ioConfig.isUseEarliestSequenceNumber(),
tuningConfig.isUseListShards()
);
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index accd8316f66..a142f414762 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -143,11 +143,9 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
maximumMessageTime,
ioConfig.getInputFormat(),
ioConfig.getEndpoint(),
- ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(),
ioConfig.getAwsAssumedRoleArn(),
- ioConfig.getAwsExternalId(),
- ioConfig.isDeaggregate()
+ ioConfig.getAwsExternalId()
);
}
@@ -197,14 +195,12 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
- 0, // no records-per-fetch, it is not used
ioConfig.getFetchDelayMillis(),
0, // skip starting background fetch, it is not used
- ioConfig.isDeaggregate(),
-
taskTuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(),
ioConfig.isDeaggregate()),
+
taskTuningConfig.getRecordBufferSizeBytesOrDefault(Runtime.getRuntime().maxMemory()),
taskTuningConfig.getRecordBufferOfferTimeout(),
taskTuningConfig.getRecordBufferFullWait(),
-
taskTuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()),
+ taskTuningConfig.getMaxBytesPerPollOrDefault(),
ioConfig.isUseEarliestSequenceNumber(),
spec.getSpec().getTuningConfig().isUseListShards()
);
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index a568aea263c..9910f22a349 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -71,12 +71,12 @@ public class KinesisSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
@JsonProperty("lateMessageRejectionPeriod") Period
lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period
earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime
lateMessageRejectionStartDateTime,
- @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
+ @JsonProperty("recordsPerFetch") @Deprecated Integer recordsPerFetch,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig
autoScalerConfig,
- @JsonProperty("deaggregate") boolean deaggregate
+ @JsonProperty("deaggregate") @Deprecated boolean deaggregate
)
{
super(
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 357bc9e57ca..a0a68c14bc0 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -74,6 +74,8 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
null,
null,
null,
+ null,
+ null,
null
);
}
@@ -98,14 +100,16 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
@JsonProperty("chatRetries") Long chatRetries,
@JsonProperty("httpTimeout") Period httpTimeout,
@JsonProperty("shutdownTimeout") Period shutdownTimeout,
- @JsonProperty("recordBufferSize") Integer recordBufferSize,
+ @JsonProperty("recordBufferSize") @Deprecated @Nullable Integer
recordBufferSize,
+ @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes,
@JsonProperty("recordBufferOfferTimeout") Integer
recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions,
- @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+ @JsonProperty("maxRecordsPerPoll") @Deprecated @Nullable Integer
maxRecordsPerPoll,
+ @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll,
@JsonProperty("intermediateHandoffPeriod") Period
intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period
repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@@ -129,6 +133,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
resetOffsetAutomatically,
skipSequenceNumberAvailabilityCheck,
recordBufferSize,
+ recordBufferSizeBytes,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchThreads,
@@ -137,6 +142,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
maxParseExceptions,
maxSavedParseExceptions,
maxRecordsPerPoll,
+ maxBytesPerPoll,
intermediateHandoffPeriod
);
@@ -225,7 +231,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
", chatRetries=" + chatRetries +
", httpTimeout=" + httpTimeout +
", shutdownTimeout=" + shutdownTimeout +
- ", recordBufferSize=" + getRecordBufferSizeConfigured() +
+ ", recordBufferSizeBytes=" + getRecordBufferSizeBytesConfigured() +
", recordBufferOfferTimeout=" + getRecordBufferOfferTimeout() +
", recordBufferFullWait=" + getRecordBufferFullWait() +
", fetchThreads=" + getFetchThreads() +
@@ -234,6 +240,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() +
+ ", maxBytesPerPoll=" + getMaxBytesPerPollConfigured() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" +
getRepartitionTransitionDuration() +
", useListShards=" + isUseListShards() +
@@ -260,6 +267,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
isResetOffsetAutomatically(),
isSkipSequenceNumberAvailabilityCheck(),
getRecordBufferSizeConfigured(),
+ getRecordBufferSizeBytesConfigured(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
getFetchThreads(),
@@ -268,6 +276,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getMaxRecordsPerPollConfigured(),
+ getMaxBytesPerPollConfigured(),
getIntermediateHandoffPeriod()
);
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index 0d6af4343aa..3162b2ea0ee 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -93,13 +93,10 @@ public class KinesisIOConfigTest
Assert.assertTrue(config.isUseTransaction());
Assert.assertFalse("minimumMessageTime",
config.getMinimumMessageTime().isPresent());
Assert.assertEquals(config.getEndpoint(),
"kinesis.us-east-1.amazonaws.com");
- Assert.assertNull(config.getRecordsPerFetchConfigured());
- Assert.assertEquals(config.getRecordsPerFetchOrDefault(1_000_000_000, 4),
1250);
Assert.assertEquals(config.getFetchDelayMillis(), 0);
Assert.assertEquals(Collections.emptySet(),
config.getStartSequenceNumbers().getExclusivePartitions());
Assert.assertNull(config.getAwsAssumedRoleArn());
Assert.assertNull(config.getAwsExternalId());
- Assert.assertFalse(config.isDeaggregate());
}
@Test
@@ -115,11 +112,9 @@ public class KinesisIOConfigTest
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
+ " \"endpoint\": \"kinesis.us-east-2.amazonaws.com\",\n"
- + " \"recordsPerFetch\": 1000,\n"
+ " \"fetchDelayMillis\": 1000,\n"
+ " \"awsAssumedRoleArn\": \"role\",\n"
- + " \"awsExternalId\": \"awsexternalid\",\n"
- + " \"deaggregate\": true\n"
+ + " \"awsExternalId\": \"awsexternalid\"\n"
+ "}";
KinesisIndexTaskIOConfig config = (KinesisIndexTaskIOConfig)
mapper.readValue(
@@ -150,11 +145,9 @@ public class KinesisIOConfigTest
Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"),
config.getMaximumMessageTime().get());
Assert.assertEquals(config.getEndpoint(),
"kinesis.us-east-2.amazonaws.com");
Assert.assertEquals(config.getStartSequenceNumbers().getExclusivePartitions(),
ImmutableSet.of("0"));
- Assert.assertEquals(1000, (int) config.getRecordsPerFetchConfigured());
Assert.assertEquals(1000, config.getFetchDelayMillis());
Assert.assertEquals("role", config.getAwsAssumedRoleArn());
Assert.assertEquals("awsexternalid", config.getAwsExternalId());
- Assert.assertTrue(config.isDeaggregate());
}
@Test
@@ -272,11 +265,9 @@ public class KinesisIOConfigTest
DateTimes.nowUtc(),
null,
"endpoint",
- 1000,
2000,
"awsAssumedRoleArn",
- "awsExternalId",
- true
+ "awsExternalId"
);
final byte[] json = mapper.writeValueAsBytes(currentConfig);
@@ -302,11 +293,9 @@ public class KinesisIOConfigTest
Assert.assertEquals(currentConfig.getMinimumMessageTime(),
oldConfig.getMinimumMessageTime());
Assert.assertEquals(currentConfig.getMaximumMessageTime(),
oldConfig.getMaximumMessageTime());
Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint());
- Assert.assertEquals((int) currentConfig.getRecordsPerFetchConfigured(),
oldConfig.getRecordsPerFetch());
Assert.assertEquals(currentConfig.getFetchDelayMillis(),
oldConfig.getFetchDelayMillis());
Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(),
oldConfig.getAwsAssumedRoleArn());
Assert.assertEquals(currentConfig.getAwsExternalId(),
oldConfig.getAwsExternalId());
- Assert.assertEquals(currentConfig.isDeaggregate(),
oldConfig.isDeaggregate());
}
@Test
@@ -324,11 +313,9 @@ public class KinesisIOConfigTest
DateTimes.nowUtc(),
DateTimes.nowUtc(),
"endpoint",
- 1000,
2000,
"awsAssumedRoleArn",
- "awsExternalId",
- true
+ "awsExternalId"
);
final byte[] json = oldMapper.writeValueAsBytes(oldConfig);
@@ -349,11 +336,9 @@ public class KinesisIOConfigTest
Assert.assertEquals(oldConfig.getMinimumMessageTime(),
currentConfig.getMinimumMessageTime());
Assert.assertEquals(oldConfig.getMaximumMessageTime(),
currentConfig.getMaximumMessageTime());
Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint());
- Assert.assertEquals(oldConfig.getRecordsPerFetch(), (int)
currentConfig.getRecordsPerFetchConfigured());
Assert.assertEquals(oldConfig.getFetchDelayMillis(),
currentConfig.getFetchDelayMillis());
Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(),
currentConfig.getAwsAssumedRoleArn());
Assert.assertEquals(oldConfig.getAwsExternalId(),
currentConfig.getAwsExternalId());
- Assert.assertEquals(oldConfig.isDeaggregate(),
currentConfig.isDeaggregate());
}
private static class OldKinesisIndexTaskIoConfig implements IOConfig
@@ -366,12 +351,9 @@ public class KinesisIOConfigTest
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
private final String endpoint;
- private final Integer recordsPerFetch;
private final Integer fetchDelayMillis;
-
private final String awsAssumedRoleArn;
private final String awsExternalId;
- private final boolean deaggregate;
@JsonCreator
private OldKinesisIndexTaskIoConfig(
@@ -383,11 +365,9 @@ public class KinesisIOConfigTest
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("endpoint") String endpoint,
- @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
- @JsonProperty("awsExternalId") String awsExternalId,
- @JsonProperty("deaggregate") boolean deaggregate
+ @JsonProperty("awsExternalId") String awsExternalId
)
{
this.baseSequenceName = baseSequenceName;
@@ -398,11 +378,9 @@ public class KinesisIOConfigTest
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
this.endpoint = endpoint;
- this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis;
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
- this.deaggregate = deaggregate;
}
@JsonProperty
@@ -453,12 +431,6 @@ public class KinesisIOConfigTest
return endpoint;
}
- @JsonProperty
- public int getRecordsPerFetch()
- {
- return recordsPerFetch;
- }
-
@JsonProperty
public int getFetchDelayMillis()
{
@@ -476,11 +448,5 @@ public class KinesisIOConfigTest
{
return awsExternalId;
}
-
- @JsonProperty
- public boolean isDeaggregate()
- {
- return deaggregate;
- }
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 2cba3f54187..da04e3ab0a6 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -76,6 +76,8 @@ public class KinesisIndexTaskSerdeTest
null,
null,
null,
+ null,
+ null,
null
);
private static final KinesisIndexTaskIOConfig IO_CONFIG = new
KinesisIndexTaskIOConfig(
@@ -90,9 +92,7 @@ public class KinesisIndexTaskSerdeTest
"endpoint",
null,
null,
- null,
- null,
- false
+ null
);
private static final String ACCESS_KEY = "test-access-key";
private static final String SECRET_KEY = "test-secret-key";
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 69516979f3e..e14b6679b09 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -184,6 +184,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
private Long maxTotalRows = null;
private final Period intermediateHandoffPeriod = null;
private int maxRecordsPerPoll;
+ private int maxBytesPerPoll;
@BeforeClass
public static void setupClass()
@@ -218,6 +219,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
doHandoff = true;
reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" +
System.currentTimeMillis(), "json");
maxRecordsPerPoll = 1;
+ maxBytesPerPoll = 1_000_000;
recordSupplier = mock(KinesisRecordSupplier.class);
@@ -562,6 +564,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
// as soon as any segment has more than one record, incremental publishing
should happen
maxRowsPerSegment = 2;
maxRecordsPerPoll = 1;
+ maxBytesPerPoll = 1_000_000;
recordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
@@ -779,9 +782,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"awsEndpoint",
null,
null,
- null,
- null,
- false
+ null
)
);
@@ -843,9 +844,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"awsEndpoint",
null,
null,
- null,
- null,
- false
+ null
)
);
@@ -1697,6 +1696,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
{
maxRowsPerSegment = 2;
maxRecordsPerPoll = 1;
+ maxBytesPerPoll = 1_000_000;
List<OrderedPartitionableRecord<String, String, ByteEntity>> records =
clone(SINGLE_PARTITION_RECORDS);
@@ -1935,9 +1935,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"awsEndpoint",
null,
null,
- null,
- null,
- false
+ null
),
context
);
@@ -2099,9 +2097,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"awsEndpoint",
null,
null,
- null,
- null,
- false
+ null
),
context
);
@@ -2250,10 +2246,15 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
public void testComputeFetchThreads()
{
final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo =
- new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 2000);
+ new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000,
10_000_000_000L);
Assert.assertEquals(6, KinesisIndexTask.computeFetchThreads(runtimeInfo,
null));
Assert.assertEquals(2, KinesisIndexTask.computeFetchThreads(runtimeInfo,
2));
+
+ final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo2 =
+ new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 1_000_000_000);
+ Assert.assertEquals(5, KinesisIndexTask.computeFetchThreads(runtimeInfo2,
null));
+ Assert.assertEquals(5, KinesisIndexTask.computeFetchThreads(runtimeInfo2,
6));
Assert.assertThrows(
IllegalArgumentException.class,
() -> KinesisIndexTask.computeFetchThreads(runtimeInfo, 0)
@@ -2297,9 +2298,7 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"awsEndpoint",
null,
null,
- null,
- null,
- false
+ null
),
null
);
@@ -2358,10 +2357,12 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
null,
null,
null,
+ null,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
maxRecordsPerPoll,
+ maxBytesPerPoll,
intermediateHandoffPeriod
);
return createTask(taskId, dataSchema, ioConfig, tuningConfig, context);
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index cd99521c18e..b61c5cf2ae4 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -77,8 +77,10 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertFalse(config.isReportParseExceptions());
Assert.assertEquals(Duration.ofMinutes(15).toMillis(),
config.getHandoffConditionTimeout());
- Assert.assertNull(config.getRecordBufferSizeConfigured());
- Assert.assertEquals(10000,
config.getRecordBufferSizeOrDefault(1_000_000_000, false));
+ Assert.assertNull(config.getRecordBufferSizeBytesConfigured());
+ Assert.assertEquals(100_000_000,
config.getRecordBufferSizeBytesOrDefault(2_000_000_000));
+ Assert.assertEquals(100_000_000,
config.getRecordBufferSizeBytesOrDefault(1_000_000_000));
+ Assert.assertEquals(10_000_000,
config.getRecordBufferSizeBytesOrDefault(100_000_000));
Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
Assert.assertEquals(5000, config.getRecordBufferFullWait());
Assert.assertNull(config.getFetchThreads());
@@ -98,7 +100,7 @@ public class KinesisIndexTaskTuningConfigTest
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
- + " \"recordBufferSize\": 1000,\n"
+ + " \"recordBufferSizeBytes\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"recordBufferFullWait\": 500,\n"
+ " \"resetOffsetAutomatically\": false,\n"
@@ -125,8 +127,8 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertTrue(config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
- Assert.assertEquals(1000, (int) config.getRecordBufferSizeConfigured());
- Assert.assertEquals(1000,
config.getRecordBufferSizeOrDefault(1_000_000_000, false));
+ Assert.assertEquals(1000, (int)
config.getRecordBufferSizeBytesConfigured());
+ Assert.assertEquals(1000,
config.getRecordBufferSizeBytesOrDefault(1_000_000_000));
Assert.assertEquals(500, config.getRecordBufferOfferTimeout());
Assert.assertEquals(500, config.getRecordBufferFullWait());
Assert.assertEquals(2, (int) config.getFetchThreads());
@@ -153,6 +155,7 @@ public class KinesisIndexTaskTuningConfigTest
5L,
true,
false,
+ null,
1000,
1000,
500,
@@ -162,6 +165,7 @@ public class KinesisIndexTaskTuningConfigTest
500,
500,
6000,
+ 1_000_000,
new Period("P3D")
);
@@ -190,7 +194,9 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(base.getRecordBufferFullWait(),
deserialized.getRecordBufferFullWait());
Assert.assertEquals(base.getRecordBufferOfferTimeout(),
deserialized.getRecordBufferOfferTimeout());
Assert.assertEquals(base.getRecordBufferSizeConfigured(),
deserialized.getRecordBufferSizeConfigured());
+ Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(),
deserialized.getRecordBufferSizeBytesConfigured());
Assert.assertEquals(base.getMaxRecordsPerPollConfigured(),
deserialized.getMaxRecordsPerPollConfigured());
+ Assert.assertEquals(base.getMaxBytesPerPollConfigured(),
deserialized.getMaxBytesPerPollConfigured());
}
@Test
@@ -212,6 +218,7 @@ public class KinesisIndexTaskTuningConfigTest
5L,
true,
false,
+ null,
1000,
1000,
500,
@@ -220,6 +227,7 @@ public class KinesisIndexTaskTuningConfigTest
false,
500,
500,
+ 1_000_000,
6000,
new Period("P3D")
);
@@ -247,7 +255,7 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxSavedParseExceptions(),
deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getRecordBufferFullWait(),
deserialized.getRecordBufferFullWait());
Assert.assertEquals(base.getRecordBufferOfferTimeout(),
deserialized.getRecordBufferOfferTimeout());
- Assert.assertEquals(base.getRecordBufferSizeConfigured(),
deserialized.getRecordBufferSizeConfigured());
+ Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(),
deserialized.getRecordBufferSizeBytesConfigured());
Assert.assertEquals(base.getMaxRecordsPerPollConfigured(),
deserialized.getMaxRecordsPerPollConfigured());
}
@@ -301,6 +309,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
null,
null,
+ null,
1000,
500,
500,
@@ -309,6 +318,7 @@ public class KinesisIndexTaskTuningConfigTest
null,
null,
10,
+ 1_000_000,
null,
null,
null,
@@ -327,7 +337,7 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(IndexSpec.DEFAULT, copy.getIndexSpec());
Assert.assertTrue(copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
- Assert.assertEquals(1000, (int) copy.getRecordBufferSizeConfigured());
+ Assert.assertEquals(1000, (int) copy.getRecordBufferSizeBytesConfigured());
Assert.assertEquals(500, copy.getRecordBufferOfferTimeout());
Assert.assertEquals(500, copy.getRecordBufferFullWait());
Assert.assertEquals(2, (int) copy.getFetchThreads());
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index af0755fcd4b..5fcf81139eb 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -147,8 +147,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
throw new RuntimeException(e);
}
}
-
- private static int recordsPerFetch;
private static AmazonKinesis kinesis;
private static ListShardsResult listShardsResult0;
private static ListShardsResult listShardsResult1;
@@ -180,7 +178,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
getRecordsResult1 = createMock(GetRecordsResult.class);
shard0 = createMock(Shard.class);
shard1 = createMock(Shard.class);
- recordsPerFetch = 1;
}
@After
@@ -219,14 +216,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- false,
100,
5000,
5000,
- 5,
+ 1_000_000,
true,
false
);
@@ -278,14 +273,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- false,
100,
5000,
5000,
- 5,
+ 1_000_000,
true,
true
);
@@ -312,7 +305,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(expectedRequest1, capturedRequest1.getValue());
}
- private static GetRecordsRequest generateGetRecordsReq(String shardIterator,
int limit)
+ private static GetRecordsRequest generateGetRecordsReq(String shardIterator)
+ {
+ return new GetRecordsRequest().withShardIterator(shardIterator);
+ }
+
+ private static GetRecordsRequest generateGetRecordsWithLimitReq(String
shardIterator, int limit)
{
return new
GetRecordsRequest().withShardIterator(shardIterator).withLimit(limit);
}
@@ -326,87 +324,9 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
.collect(Collectors.toList());
}
- @Test
- public void testPoll() throws InterruptedException
- {
- recordsPerFetch = 100;
-
- EasyMock.expect(kinesis.getShardIterator(
- EasyMock.anyObject(),
- EasyMock.eq(SHARD_ID0),
- EasyMock.anyString(),
- EasyMock.anyString()
- )).andReturn(
- getShardIteratorResult0).anyTimes();
-
- EasyMock.expect(kinesis.getShardIterator(
- EasyMock.anyObject(),
- EasyMock.eq(SHARD_ID1),
- EasyMock.anyString(),
- EasyMock.anyString()
- )).andReturn(
- getShardIteratorResult1).anyTimes();
-
-
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
-
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
- .andReturn(getRecordsResult0)
- .anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR,
recordsPerFetch)))
- .andReturn(getRecordsResult1)
- .anyTimes();
-
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
-
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once();
-
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
-
EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes();
-
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once();
-
EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once();
-
- replayAll();
-
- Set<StreamPartition<String>> partitions = ImmutableSet.of(
- StreamPartition.of(STREAM, SHARD_ID0),
- StreamPartition.of(STREAM, SHARD_ID1)
- );
-
-
- recordSupplier = new KinesisRecordSupplier(
- kinesis,
- recordsPerFetch,
- 0,
- 2,
- false,
- 100,
- 5000,
- 5000,
- 100,
- true,
- false
- );
-
- recordSupplier.assign(partitions);
- recordSupplier.seekToEarliest(partitions);
- recordSupplier.start();
-
- while (recordSupplier.bufferSize() < 12) {
- Thread.sleep(100);
- }
-
- List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords
= cleanRecords(recordSupplier.poll(
- POLL_TIMEOUT_MILLIS));
-
- verifyAll();
-
- Assert.assertEquals(partitions, recordSupplier.getAssignment());
- Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS));
- Assert.assertEquals(SHARDS_LAG_MILLIS,
recordSupplier.getPartitionResourcesTimeLag());
- }
-
@Test
public void testPollWithKinesisInternalFailure() throws InterruptedException
{
- recordsPerFetch = 100;
-
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
@@ -425,10 +345,10 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
.andReturn(getRecordsResult0)
.anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
.andReturn(getRecordsResult1)
.anyTimes();
AmazonServiceException getException = new
AmazonServiceException("InternalFailure");
@@ -460,14 +380,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- false,
- 100,
+ 10_000,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -493,8 +411,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
@Test
public void testPollWithKinesisNonRetryableFailure() throws
InterruptedException
{
- recordsPerFetch = 100;
-
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
@@ -508,7 +424,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
getException.setStatusCode(400);
getException.setServiceName("AmazonKinesis");
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
.andThrow(getException)
.once();
@@ -521,14 +437,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
1,
- false,
100,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -556,8 +470,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
public void testSeek()
throws InterruptedException
{
- recordsPerFetch = 100;
-
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
@@ -576,10 +488,10 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
.andReturn(getRecordsResult0)
.anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
.andReturn(getRecordsResult1)
.anyTimes();
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS.subList(1,
SHARD0_RECORDS.size())).once();
@@ -600,14 +512,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- false,
- 100,
+ 10_000,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -636,8 +546,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
public void testSeekToLatest()
throws InterruptedException
{
- recordsPerFetch = 100;
-
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
@@ -668,14 +576,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- false,
100,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -703,14 +609,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- 1,
0,
2,
- false,
100,
5000,
5000,
- 5,
+ 1_000_000,
true,
false
);
@@ -725,7 +629,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
throws InterruptedException
{
// tests that after doing a seek, the now invalid records in buffer is
cleaned up properly
- recordsPerFetch = 100;
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
@@ -745,10 +648,10 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).once();
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
.andReturn(getRecordsResult1)
.once();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
.andReturn(getRecordsResult0)
.once();
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS.subList(5,
SHARD1_RECORDS.size())).once();
@@ -766,14 +669,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- false,
- 100,
+ 10_000,
5000,
5000,
- 1,
+ 1_000_000,
true,
false
);
@@ -816,8 +717,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
@Test
public void testPollDeaggregate() throws InterruptedException
{
- recordsPerFetch = 100;
-
EasyMock.expect(kinesis.getShardIterator(
EasyMock.anyObject(),
EasyMock.eq(SHARD_ID0),
@@ -836,10 +735,10 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
.andReturn(getRecordsResult0)
.anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
.andReturn(getRecordsResult1)
.anyTimes();
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once();
@@ -859,14 +758,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- true,
- 100,
+ 10_000,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -922,7 +819,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once();
AmazonClientException ex = new AmazonClientException(new IOException());
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
1000)))
+
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR,
1000)))
.andThrow(ex)
.andReturn(getRecordsResult0)
.once();
@@ -935,14 +832,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- true,
- 100,
+ 10_000,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -961,7 +856,7 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).times(1);
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
1000)))
+
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR,
1000)))
.andReturn(getRecordsResult0)
.times(1);
@@ -972,14 +867,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- true,
- 100,
+ 10_000,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -1033,12 +926,18 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR)))
.andReturn(getRecordsResult0)
.anyTimes();
- EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR,
recordsPerFetch)))
+ EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR)))
.andReturn(getRecordsResult1)
.anyTimes();
+
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR,
1)))
+ .andReturn(getRecordsResult0)
+ .anyTimes();
+
EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD1_ITERATOR,
1)))
+ .andReturn(getRecordsResult1)
+ .anyTimes();
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).times(2);
EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS_EMPTY).times(2);
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes();
@@ -1055,14 +954,12 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
recordSupplier = new KinesisRecordSupplier(
kinesis,
- recordsPerFetch,
0,
2,
- true,
- 100,
+ 10_000,
5000,
5000,
- 100,
+ 1_000_000,
true,
false
);
@@ -1110,17 +1007,16 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
public void testIsOffsetAvailable()
{
AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
- KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
- recordsPerFetch,
- 0,
- 2,
- false,
- 100,
- 5000,
- 5000,
- 5,
- true,
- false
+ KinesisRecordSupplier target = new KinesisRecordSupplier(
+ mockKinesis,
+ 0,
+ 2,
+ 100,
+ 5000,
+ 5000,
+ 1_000_000,
+ true,
+ false
);
StreamPartition<String> partition = new StreamPartition<>(STREAM,
SHARD_ID0);
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
index fabde1852aa..4de35cf5e5d 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
@@ -70,11 +70,9 @@ public class KinesisSupervisorIOConfigTest
Assert.assertFalse("lateMessageRejectionPeriod",
config.getLateMessageRejectionPeriod().isPresent());
Assert.assertFalse("earlyMessageRejectionPeriod",
config.getEarlyMessageRejectionPeriod().isPresent());
Assert.assertFalse("lateMessageRejectionStartDateTime",
config.getLateMessageRejectionStartDateTime().isPresent());
- Assert.assertNull(config.getRecordsPerFetch());
Assert.assertEquals(0, config.getFetchDelayMillis());
Assert.assertNull(config.getAwsAssumedRoleArn());
Assert.assertNull(config.getAwsExternalId());
- Assert.assertFalse(config.isDeaggregate());
}
@Test
@@ -94,11 +92,9 @@ public class KinesisSupervisorIOConfigTest
+ " \"completionTimeout\": \"PT45M\",\n"
+ " \"lateMessageRejectionPeriod\": \"PT1H\",\n"
+ " \"earlyMessageRejectionPeriod\": \"PT1H\",\n"
- + " \"recordsPerFetch\": 4000,\n"
+ " \"fetchDelayMillis\": 1000,\n"
+ " \"awsAssumedRoleArn\": \"role\",\n"
- + " \"awsExternalId\": \"awsexternalid\",\n"
- + " \"deaggregate\": true\n"
+ + " \"awsExternalId\": \"awsexternalid\"\n"
+ "}";
KinesisSupervisorIOConfig config = mapper.readValue(
@@ -117,11 +113,9 @@ public class KinesisSupervisorIOConfigTest
Assert.assertEquals(Duration.standardMinutes(45),
config.getCompletionTimeout());
Assert.assertEquals(Duration.standardHours(1),
config.getLateMessageRejectionPeriod().get());
Assert.assertEquals(Duration.standardHours(1),
config.getEarlyMessageRejectionPeriod().get());
- Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch());
Assert.assertEquals(1000, config.getFetchDelayMillis());
Assert.assertEquals("role", config.getAwsAssumedRoleArn());
Assert.assertEquals("awsexternalid", config.getAwsExternalId());
- Assert.assertTrue(config.isDeaggregate());
}
@Test
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index c9b6406bd3f..87297dd8f28 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -208,6 +208,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
+ null,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@@ -318,7 +320,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
false,
null,
- null,
autoScalerConfig
);
KinesisSupervisorSpec kinesisSupervisorSpec =
supervisor.getKinesisSupervisorSpec();
@@ -392,7 +393,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
false,
null,
- null,
autoScalerConfig
);
@@ -509,26 +509,26 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
// create KinesisSupervisorIOConfig with autoScalerConfig null
KinesisSupervisorIOConfig
kinesisSupervisorIOConfigWithNullAutoScalerConfig = new
KinesisSupervisorIOConfig(
- STREAM,
- INPUT_FORMAT,
- "awsEndpoint",
- null,
- 1,
- 1,
- new Period("PT30M"),
- new Period("P1D"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null,
- null,
- 100,
- 1000,
- null,
- null,
- null,
- false
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30M"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ 100,
+ 1000,
+ null,
+ null,
+ null,
+ false
);
AutoScalerConfig autoscalerConfigNull =
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig();
@@ -536,26 +536,26 @@ public class KinesisSupervisorTest extends EasyMockSupport
// create KinesisSupervisorIOConfig with autoScalerConfig Empty
KinesisSupervisorIOConfig
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new
KinesisSupervisorIOConfig(
- STREAM,
- INPUT_FORMAT,
- "awsEndpoint",
- null,
- 1,
- 1,
- new Period("PT30M"),
- new Period("P1D"),
- new Period("PT30S"),
- false,
- new Period("PT30M"),
- null,
- null,
- null,
- 100,
- 1000,
- null,
- null,
- OBJECT_MAPPER.convertValue(new HashMap<>(),
AutoScalerConfig.class),
- false
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30M"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ 100,
+ 1000,
+ null,
+ null,
+ OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
+ false
);
AutoScalerConfig autoscalerConfig =
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig();
@@ -3740,7 +3740,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("P1D"),
new Period("P1D"),
false,
- 42,
1000,
true
);
@@ -3838,7 +3837,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("P1D"),
new Period("P1D"),
false,
- 42,
1000,
false
);
@@ -3924,7 +3922,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("P1D"),
false,
42,
- 42,
dataSchema,
tuningConfig
);
@@ -3967,7 +3964,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
42, // This property is different from tuningConfig
+ 1_000_000,
null,
null,
null,
@@ -4074,7 +4073,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("P1D"),
false,
42,
- 42,
dataSchema,
tuningConfig
);
@@ -5149,6 +5147,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
+ null,
null
);
@@ -5198,7 +5198,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
false,
null,
- null,
null
);
}
@@ -5211,7 +5210,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended,
- Integer recordsPerFetch,
Integer fetchDelayMillis,
AutoScalerConfig autoScalerConfig
)
@@ -5231,7 +5229,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
- recordsPerFetch,
+ null,
fetchDelayMillis,
null,
null,
@@ -5301,7 +5299,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended,
- Integer recordsPerFetch,
Integer fetchDelayMillis,
boolean isTaskCurrentReturn
)
@@ -5321,7 +5318,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
- recordsPerFetch,
+ null,
fetchDelayMillis,
null,
null,
@@ -5389,7 +5386,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
boolean suspended,
- Integer recordsPerFetch,
Integer fetchDelayMillis,
DataSchema dataSchema,
KinesisSupervisorTuningConfig tuningConfig
@@ -5410,7 +5406,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
null,
- recordsPerFetch,
+ null,
fetchDelayMillis,
null,
null,
@@ -5559,9 +5555,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
"awsEndpoint",
null,
null,
- null,
- null,
- false
+ null
),
Collections.emptyMap(),
false,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index 70611266efe..ac84d2105cd 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -52,6 +52,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
@JsonProperty("resetOffsetAutomatically") Boolean
resetOffsetAutomatically,
@JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean
skipSequenceNumberAvailabilityCheck,
@JsonProperty("recordBufferSize") Integer recordBufferSize,
+ @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes,
@JsonProperty("recordBufferOfferTimeout") Integer
recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
@JsonProperty("fetchThreads") Integer fetchThreads,
@@ -60,6 +61,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+ @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll,
@JsonProperty("intermediateHandoffPeriod") @Nullable Period
intermediateHandoffPeriod,
@JsonProperty("extra") String extra
)
@@ -81,6 +83,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
resetOffsetAutomatically,
skipSequenceNumberAvailabilityCheck,
recordBufferSize,
+ recordBufferSizeBytes,
recordBufferOfferTimeout,
recordBufferFullWait,
fetchThreads,
@@ -89,6 +92,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
maxParseExceptions,
maxSavedParseExceptions,
maxRecordsPerPoll,
+ maxBytesPerPoll,
intermediateHandoffPeriod
);
this.extra = extra;
@@ -113,6 +117,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig
extends KinesisIndexTaskTu
base.isResetOffsetAutomatically(),
base.isSkipSequenceNumberAvailabilityCheck(),
base.getRecordBufferSizeConfigured(),
+ base.getRecordBufferSizeBytesConfigured(),
base.getRecordBufferOfferTimeout(),
base.getRecordBufferFullWait(),
base.getFetchThreads(),
@@ -121,6 +126,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig
extends KinesisIndexTaskTu
base.getMaxParseExceptions(),
base.getMaxSavedParseExceptions(),
base.getMaxRecordsPerPollConfigured(),
+ base.getMaxBytesPerPollConfigured(),
base.getIntermediateHandoffPeriod()
);
this.extra = extra;
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java
b/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java
new file mode 100644
index 00000000000..49105f3a833
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
+
+/**
+ * Similar to LinkedBlockingQueue but can be bounded by the total byte size of
the items present in the queue
+ * rather than number of items.
+ */
+public class MemoryBoundLinkedBlockingQueue<T>
+{
+ private final long memoryBound;
+ private final AtomicLong currentMemory;
+ private final LinkedBlockingQueue<ObjectContainer<T>> queue;
+ private final ReentrantLock putLock = new ReentrantLock();
+ private final Condition notFull = putLock.newCondition();
+
+ public MemoryBoundLinkedBlockingQueue(long memoryBound)
+ {
+ this(new LinkedBlockingQueue<>(), memoryBound);
+ }
+
+ @VisibleForTesting
+ MemoryBoundLinkedBlockingQueue(LinkedBlockingQueue<ObjectContainer<T>>
queue, long memoryBound)
+ {
+ this.memoryBound = memoryBound;
+ this.currentMemory = new AtomicLong(0L);
+ this.queue = queue;
+ }
+
+ // returns true/false depending on whether item was added or not
+ public boolean offer(ObjectContainer<T> item)
+ {
+ final long itemLength = item.getSize();
+
+ if (currentMemory.addAndGet(itemLength) <= memoryBound) {
+ if (queue.offer(item)) {
+ return true;
+ }
+ }
+ currentMemory.addAndGet(-itemLength);
+ return false;
+ }
+
+ public boolean offer(ObjectContainer<T> item, long timeout, TimeUnit unit)
throws InterruptedException
+ {
+ final long itemLength = item.getSize();
+
+ long nanos = unit.toNanos(timeout);
+ final ReentrantLock putLock = this.putLock;
+ putLock.lockInterruptibly();
+ try {
+ while (currentMemory.get() + itemLength > memoryBound) {
+ if (nanos <= 0L) {
+ return false;
+ }
+ nanos = notFull.awaitNanos(nanos);
+ }
+ if (currentMemory.addAndGet(itemLength) <= memoryBound) {
+ if (queue.offer(item, timeout, unit)) {
+ return true;
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ currentMemory.addAndGet(-itemLength);
+ throw e;
+ }
+ finally {
+ putLock.unlock();
+ }
+ currentMemory.addAndGet(-itemLength);
+ return false;
+ }
+
+ // blocks until at least one item is available to take
+ public ObjectContainer<T> take() throws InterruptedException
+ {
+ final ObjectContainer<T> ret = queue.take();
+ currentMemory.addAndGet(-ret.getSize());
+ signalNotFull();
+ return ret;
+ }
+
+ public Stream<ObjectContainer<T>> stream()
+ {
+ return queue.stream();
+ }
+
+ /**
+ * Drain up to specified bytes worth of items from the queue into the
provided buffer. At least one record is
+ * drained from the queue, regardless of the value of bytes specified.
+ *
+ * @param buffer The buffer to drain queue items into.
+ * @param bytesToDrain The amount of bytes to drain from the queue
+ * @param timeout The maximum time allowed to drain the queue
+ * @param unit The time unit of the timeout.
+ *
+ * @return The number of items drained from the queue.
+ * @throws InterruptedException
+ */
+ public int drain(Collection<? super ObjectContainer<T>> buffer, int
bytesToDrain, long timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ Preconditions.checkNotNull(buffer);
+ boolean signalNotFull = false;
+ try {
+ long deadline = System.nanoTime() + unit.toNanos(timeout);
+ int added = 0;
+ long bytesAdded = 0;
+ while (bytesAdded < bytesToDrain) {
+ ObjectContainer<T> e = queue.poll(deadline - System.nanoTime(),
TimeUnit.NANOSECONDS);
+ if (e == null) {
+ break;
+ }
+ currentMemory.addAndGet(-e.getSize());
+ signalNotFull = true;
+ buffer.add(e);
+ ++added;
+ bytesAdded += e.getSize();
+ e = queue.peek();
+ if (e != null && (bytesAdded + e.getSize()) > bytesToDrain) {
+ break;
+ }
+ }
+ return added;
+ }
+ finally {
+ if (signalNotFull) {
+ signalNotFull();
+ }
+ }
+ }
+
+ public int size()
+ {
+ return queue.size();
+ }
+
+ public long byteSize()
+ {
+ return currentMemory.get();
+ }
+
+ public long remainingCapacity()
+ {
+ return memoryBound - currentMemory.get();
+ }
+
+ private void signalNotFull()
+ {
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock();
+ try {
+ notFull.signal();
+ }
+ finally {
+ putLock.unlock();
+ }
+ }
+
+ public static class ObjectContainer<T>
+ {
+ private final T data;
+ private final long size;
+
+ public ObjectContainer(T data, long size)
+ {
+ this.data = data;
+ this.size = size;
+ }
+
+ public T getData()
+ {
+ return data;
+ }
+
+ public long getSize()
+ {
+ return size;
+ }
+ }
+}
+
diff --git
a/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java
b/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java
new file mode 100644
index 00000000000..ec36d83f250
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class MemoryBoundLinkedBlockingQueueTest
+{
+ @Test
+ public void test_offer_emptyQueueWithEnoughCapacity_true()
+ {
+
+ long byteCapacity = 10L;
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
ImmutableList.of());
+ byte[] item = "item".getBytes(StandardCharsets.UTF_8);
+
+ boolean succeeds = queue.offer(new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item, item.length));
+
+ long expectedByteSize = item.length;
+ Assert.assertTrue(succeeds);
+ Assert.assertEquals(1, queue.size());
+ Assert.assertEquals(expectedByteSize, queue.byteSize());
+ Assert.assertEquals(byteCapacity - item.length, queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_offer_nonEmptyQueueWithEnoughCapacity_true()
+ {
+
+ long byteCapacity = 10L;
+ byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+ byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items =
buildItemContainers(
+ ImmutableList.of(item1)
+ );
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
items);
+
+ boolean succeeds = queue.offer(new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, item2.length));
+
+ long expectedByteSize = item1.length + item2.length;
+ Assert.assertTrue(succeeds);
+ Assert.assertEquals(2, queue.size());
+ Assert.assertEquals(expectedByteSize, queue.byteSize());
+ Assert.assertEquals(byteCapacity - expectedByteSize,
queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_offer_queueWithoutEnoughCapacity_false()
+ {
+
+ long byteCapacity = 7L;
+ byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+ byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items =
buildItemContainers(
+ ImmutableList.of(item1)
+ );
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
items);
+
+ boolean succeeds = queue.offer(new
MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, item2.length));
+
+ long expectedByteSize = item1.length;
+ Assert.assertFalse(succeeds);
+ Assert.assertEquals(1, queue.size());
+ Assert.assertEquals(expectedByteSize, queue.byteSize());
+ Assert.assertEquals(byteCapacity - expectedByteSize,
queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_offerWithTimeLimit_interruptedExceptinThrown_throws()
+ {
+ long byteCapacity = 10L;
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(
+ byteCapacity,
+ ImmutableList.of(),
+ new InterruptedExceptionThrowingQueue()
+ );
+ byte[] item = "item".getBytes(StandardCharsets.UTF_8);
+
+ Assert.assertThrows(
+ InterruptedException.class,
+ () -> queue.offer(
+ new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item,
item.length),
+ 1L,
+ TimeUnit.MILLISECONDS
+ )
+ );
+
+ Assert.assertEquals(0, queue.size());
+ Assert.assertEquals(0L, queue.byteSize());
+ Assert.assertEquals(byteCapacity, queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_offerWithTimeLimit_fullQueue_waitsTime() throws
InterruptedException
+ {
+ long timeoutMillis = 2000L;
+ long byteCapacity = 10L;
+ byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+ byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items =
buildItemContainers(
+ ImmutableList.of(item1, item2)
+ );
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(
+ byteCapacity,
+ items,
+ new InterruptedExceptionThrowingQueue()
+ );
+ byte[] item = "item".getBytes(StandardCharsets.UTF_8);
+ long start = System.currentTimeMillis();
+ boolean succeeds = queue.offer(
+ new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item,
item.length),
+ timeoutMillis,
+ TimeUnit.MILLISECONDS
+ );
+ long end = System.currentTimeMillis();
+
+ Assert.assertFalse(succeeds);
+ Assert.assertTrue(
+ StringUtils.format(
+ "offer only waited at most [%d] nanos instead of expected [%d]
nanos",
+ TimeUnit.MILLISECONDS.toNanos(end - start),
+ TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
+ ),
+ TimeUnit.MILLISECONDS.toNanos(end - start) >=
TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
+ );
+ Assert.assertEquals(2, queue.size());
+ Assert.assertEquals(10L, queue.byteSize());
+ Assert.assertEquals(0L, queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_take_nonEmptyQueue_expected() throws InterruptedException
+ {
+
+ long byteCapacity = 10L;
+ byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+ byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+ MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> item1Container =
+ new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item1,
item1.length);
+ MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> item2Container =
+ new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2,
item2.length);
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
ImmutableList.of());
+ Assert.assertTrue(queue.offer(item1Container));
+ Assert.assertTrue(queue.offer(item2Container));
+
+ MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> takenItem =
queue.take();
+ long expectedByteSize = item2.length;
+
+ Assert.assertSame(item1Container, takenItem);
+ Assert.assertEquals(1, queue.size());
+ Assert.assertEquals(expectedByteSize, queue.byteSize());
+ Assert.assertEquals(byteCapacity - expectedByteSize,
queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_drain_emptyQueue_succeeds() throws InterruptedException
+ {
+
+ long byteCapacity = 7L;
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
ImmutableList.of());
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer
= new ArrayList<>();
+
+ int numAdded = queue.drain(buffer, 1, 1, TimeUnit.SECONDS);
+
+ Assert.assertTrue(numAdded == 0 && numAdded == buffer.size());
+ Assert.assertEquals(0, queue.size());
+ Assert.assertEquals(0L, queue.byteSize());
+ Assert.assertEquals(byteCapacity, queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_drain_queueWithOneItem_succeeds() throws
InterruptedException
+ {
+
+ long byteCapacity = 7L;
+ byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items =
buildItemContainers(
+ ImmutableList.of(item1)
+ );
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
items);
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer
= new ArrayList<>();
+
+ int numAdded = queue.drain(buffer, 1, 1, TimeUnit.MINUTES);
+
+ Assert.assertTrue(numAdded == 1 && numAdded == buffer.size());
+ Assert.assertEquals(0, queue.size());
+ Assert.assertEquals(0L, queue.byteSize());
+ Assert.assertEquals(byteCapacity, queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_drain_queueWithMultipleItems_succeeds() throws
InterruptedException
+ {
+
+ long byteCapacity = 15L;
+ byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+ byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+ byte[] item3 = "item3".getBytes(StandardCharsets.UTF_8);
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items =
buildItemContainers(
+ ImmutableList.of(item1, item2, item3)
+ );
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
items, new NotAllDrainedQueue());
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer
= new ArrayList<>();
+
+ int numAdded = queue.drain(buffer, 10, 1, TimeUnit.MINUTES);
+
+ Assert.assertTrue(numAdded == 2 && numAdded == buffer.size());
+ Assert.assertEquals(1, queue.size());
+ Assert.assertEquals(item3.length, queue.byteSize());
+ Assert.assertEquals(byteCapacity - item3.length,
queue.remainingCapacity());
+ }
+
+ @Test
+ public void test_drain_queueWithFirstItemSizeGreaterThanLimit_succeeds()
throws InterruptedException
+ {
+
+ long byteCapacity = 15L;
+ byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8);
+ byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8);
+ byte[] item3 = "item3".getBytes(StandardCharsets.UTF_8);
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> items =
buildItemContainers(
+ ImmutableList.of(item1, item2, item3)
+ );
+ MemoryBoundLinkedBlockingQueue<byte[]> queue = setupQueue(byteCapacity,
items, new NotAllDrainedQueue());
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> buffer
= new ArrayList<>();
+
+ int numAdded = queue.drain(buffer, item1.length - 1, 1, TimeUnit.MINUTES);
+
+ Assert.assertTrue(numAdded == 1 && numAdded == buffer.size());
+ Assert.assertEquals(2, queue.size());
+ Assert.assertEquals(item2.length + item3.length, queue.byteSize());
+ Assert.assertEquals(byteCapacity - (item2.length + item3.length),
queue.remainingCapacity());
+ }
+
+ private static <T> MemoryBoundLinkedBlockingQueue<T> setupQueue(
+ long byteCapacity,
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>> items
+ )
+ {
+ return setupQueue(byteCapacity, items, null);
+ }
+
+ private static <T> MemoryBoundLinkedBlockingQueue<T> setupQueue(
+ long byteCapacity,
+ Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>> items,
+ @Nullable
LinkedBlockingQueue<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>>
underlyingQueue
+ )
+ {
+ Assert.assertTrue(getTotalSizeOfItems(items) <= byteCapacity);
+ MemoryBoundLinkedBlockingQueue<T> queue = underlyingQueue != null ?
+ new MemoryBoundLinkedBlockingQueue<>(underlyingQueue, byteCapacity) :
new MemoryBoundLinkedBlockingQueue<>(byteCapacity);
+ items.forEach(i -> Assert.assertTrue(queue.offer(i)));
+ return queue;
+ }
+
+ private static
Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>>
buildItemContainers(
+ Collection<byte[]> items
+ )
+ {
+ return items.stream()
+ .map(i -> new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(i,
i.length))
+ .collect(Collectors.toList());
+ }
+
+ private static <T> long
getTotalSizeOfItems(Collection<MemoryBoundLinkedBlockingQueue.ObjectContainer<T>>
items)
+ {
+ return
items.stream().mapToLong(MemoryBoundLinkedBlockingQueue.ObjectContainer::getSize).sum();
+ }
+
+ static class InterruptedExceptionThrowingQueue
+ extends
LinkedBlockingQueue<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>>
+ {
+ @Override
+ public boolean
offer(MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> item, long
timeout, TimeUnit unit)
+ throws InterruptedException
+ {
+ throw new InterruptedException("exception thrown");
+ }
+ }
+
+ static class NotAllDrainedQueue
+ extends
LinkedBlockingQueue<MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>>
+ {
+ @Override
+ public int drainTo(Collection<? super
MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]>> c, int maxElements)
+ {
+ MemoryBoundLinkedBlockingQueue.ObjectContainer<byte[]> firstItem =
this.poll();
+ c.add(firstItem);
+ return 1;
+ }
+ }
+}
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index 3594f70c992..41f77593d28 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -1222,13 +1222,6 @@ export function getIoConfigTuningFormFields(
</>
),
},
- {
- name: 'recordsPerFetch',
- type: 'number',
- defaultValue: 4000,
- defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
- info: <>The number of records to request per GetRecords call to
Kinesis.</>,
- },
{
name: 'pollTimeout',
type: 'number',
@@ -1249,13 +1242,6 @@ export function getIoConfigTuningFormFields(
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: <>Time in milliseconds to wait between subsequent GetRecords
calls to Kinesis.</>,
},
- {
- name: 'deaggregate',
- type: 'boolean',
- defaultValue: false,
- defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
- info: <>Whether to use the de-aggregate function of the KCL.</>,
- },
{
name: 'startDelay',
type: 'duration',
@@ -1443,7 +1429,7 @@ export interface TuningConfig {
offsetFetchPeriod?: string;
maxParseExceptions?: number;
maxSavedParseExceptions?: number;
- recordBufferSize?: number;
+ recordBufferSizeBytes?: number;
recordBufferOfferTimeout?: number;
recordBufferFullWait?: number;
fetchThreads?: number;
@@ -2055,13 +2041,13 @@ const TUNING_FORM_FIELDS: Field<IngestionSpec>[] = [
),
},
{
- name: 'spec.tuningConfig.recordBufferSize',
+ name: 'spec.tuningConfig.recordBufferSizeBytes',
type: 'number',
- defaultValue: 10000,
+ defaultValue: 100000000,
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: (
<>
- Size of the buffer (number of events) used between the Kinesis fetch
threads and the main
+ Size of the buffer (heap memory bytes) used between the Kinesis fetch
threads and the main
ingestion thread.
</>
),
@@ -2106,15 +2092,15 @@ const TUNING_FORM_FIELDS: Field<IngestionSpec>[] = [
),
},
{
- name: 'spec.tuningConfig.maxRecordsPerPoll',
+ name: 'spec.tuningConfig.maxBytesPerPoll',
type: 'number',
- defaultValue: 100,
+ defaultValue: 1000000,
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
hideInMore: true,
info: (
<>
- The maximum number of records/events to be fetched from buffer per
poll. The actual maximum
- will be <Code>max(maxRecordsPerPoll, max(bufferSize, 1))</Code>.
+ The maximum number of bytes to be fetched from buffer per poll. At
least one record will be
+ fetched regardless of config.
</>
),
},
diff --git a/website/.spelling b/website/.spelling
index 59585a52b2d..f507d8e646f 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1125,6 +1125,7 @@
LZ4LZFuncompressedLZ4LZ4LZFuncompressednoneLZ4autolongsautolongslongstypeconcise
deaggregate
druid-kinesis-indexing-service
maxRecordsPerPoll
+maxBytesPerPoll
maxRecordsPerPollrecordsPerFetchfetchDelayMillisreplicasfetchDelayMillisrecordsPerFetchfetchDelayMillismaxRecordsPerPollamazon-kinesis-client1
numKinesisShards
numProcessors
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]