This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d294404924 Kinesis ingestion with empty shards (#12792)
d294404924 is described below
commit d294404924579526ca1396d004b499e59fd43bcb
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Aug 5 22:38:58 2022 +0530
Kinesis ingestion with empty shards (#12792)
Kinesis ingestion requires all shards to have at least 1 record at the
required position in druid.
Even if this is satisified initially, resharding the stream can lead to
empty intermediate shards. A significant delay in writing to newly created
shards was also problematic.
Kinesis shard sequence numbers are big integers. Introduce two more custom
sequence tokens UNREAD_TRIM_HORIZON and UNREAD_LATEST to indicate that a shard
has not been read from and that it needs to be read from the start or the end
respectively.
These values can be used to avoid the need to read at least one record to
obtain a sequence number for ingesting a newly discovered shard.
If a record cannot be obtained immediately, use a marker to obtain the
relevant shardIterator and use this shardIterator to obtain a valid sequence
number. As long as a valid sequence number is not obtained, continue storing
the token as the offset.
These tokens (UNREAD_TRIM_HORIZON and UNREAD_LATEST) are logically ordered
to be earlier than any valid sequence number.
However, the ordering requires a few subtle changes to the existing
mechanism for record sequence validation:
The sequence availability check ensures that the current offset is before
the earliest available sequence in the shard. However, current token being an
UNREAD token indicates that any sequence number in the shard is valid (despite
the ordering)
Kinesis sequence numbers are inclusive i.e if current sequence == end
sequence, there are more records left to read.
However, the equality check is exclusive when dealing with UNREAD tokens.
---
.../extensions-core/kinesis-ingestion.md | 1 -
.../druid/indexing/kafka/KafkaRecordSupplier.java | 9 +
.../druid/indexing/kinesis/KinesisIndexTask.java | 1 -
.../indexing/kinesis/KinesisIndexTaskRunner.java | 9 +-
.../kinesis/KinesisIndexTaskTuningConfig.java | 15 --
.../indexing/kinesis/KinesisRecordSupplier.java | 185 +++++++++++++++------
.../druid/indexing/kinesis/KinesisSamplerSpec.java | 1 -
.../indexing/kinesis/KinesisSequenceNumber.java | 60 ++++++-
.../kinesis/supervisor/KinesisSupervisor.java | 1 -
.../supervisor/KinesisSupervisorTuningConfig.java | 5 -
.../kinesis/KinesisIndexTaskSerdeTest.java | 1 -
.../indexing/kinesis/KinesisIndexTaskTest.java | 1 -
.../kinesis/KinesisIndexTaskTuningConfigTest.java | 8 -
.../kinesis/KinesisRecordSupplierTest.java | 130 +++++++++++----
.../kinesis/supervisor/KinesisSupervisorTest.java | 37 ++++-
.../TestModifiedKinesisIndexTaskTuningConfig.java | 3 -
.../SeekableStreamIndexTaskRunner.java | 8 +-
.../common/OrderedSequenceNumber.java | 20 +++
.../seekablestream/common/RecordSupplier.java | 8 +
.../supervisor/SeekableStreamSupervisor.java | 16 +-
.../overlord/sampler/InputSourceSamplerTest.java | 7 +
.../RecordSupplierInputSourceTest.java | 7 +
.../druid/testing/utils/KinesisEventWriter.java | 5 +
.../utils/KinesisSingleShardEventWriter.java | 44 +++++
...tractKinesisIndexingServiceEmptyShardsTest.java | 45 +++++
...isIndexingServiceEmptyShardsSerializedTest.java | 74 +++++++++
web-console/src/druid-models/ingestion-spec.tsx | 16 --
27 files changed, 552 insertions(+), 165 deletions(-)
diff --git a/docs/development/extensions-core/kinesis-ingestion.md
b/docs/development/extensions-core/kinesis-ingestion.md
index 63d2e9a6a0..a22c33bcfd 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -300,7 +300,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is
specified, default param
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between
the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for
space to become available in the buffer before timing out.| no (default ==
5000)|
|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the
buffer to drain before attempting to fetch records from Kinesis again.|no
(default == 5000)|
-|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait
for Kinesis to return the earliest or latest sequence number for a shard.
Kinesis will not return the latest sequence number if no data is actively being
written to that shard. In this case, this fetch call will repeatedly timeout
and retry until fresh data is written to the stream.|no (default == 60000)|
|`fetchThreads`|Integer|Size of the pool of threads fetching data from
Kinesis. There is no benefit in having more threads than Kinesis shards.|no
(default == procs * 2, where "procs" is the number of processors on the server
that the task is running on)
|
|`segmentWriteOutMediumFactory`|Object|Segment write-out medium to use when
creating segments. See below for more information.|no (not specified by
default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type`
is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand
off segments. Handoff will happen either if `maxRowsPerSegment` or
`maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens
earlier.| no (default == P2147483647D)|
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index fe32ffe5ff..d64c0a64c5 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -26,6 +26,7 @@ import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
@@ -157,6 +158,14 @@ public class KafkaRecordSupplier implements
RecordSupplier<Integer, Long, KafkaR
return nextPos;
}
+ @Override
+ public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Long> offset)
+ {
+ final Long earliestOffset = getEarliestSequenceNumber(partition);
+ return earliestOffset != null
+ &&
offset.isAvailableWithEarliest(KafkaSequenceNumber.of(earliestOffset));
+ }
+
@Override
public Long getPosition(StreamPartition<Integer> partition)
{
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 48f5d26e24..a0b31f3116 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -106,7 +106,6 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, By
tuningConfig.getRecordBufferSize(),
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
- tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll(),
false,
useListShards
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 4ab66a6e1c..22448ba67a 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -127,9 +127,7 @@ public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
- String earliestSequenceNumber =
recordSupplier.getEarliestSequenceNumber(streamPartition);
- if (earliestSequenceNumber == null
- ||
createSequenceNumber(earliestSequenceNumber).compareTo(createSequenceNumber(sequence))
> 0) {
+ if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
if (task.getTuningConfig().isResetOffsetAutomatically()) {
log.info("Attempting to reset sequences automatically for all
partitions");
try {
@@ -144,10 +142,9 @@ public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String
}
} else {
throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] (earliest: [%s]) and resetOffsetAutomatically is not enabled",
+ "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
sequence,
- streamPartition.getPartitionId(),
- earliestSequenceNumber
+ streamPartition.getPartitionId()
);
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index abf27936b6..21c0510758 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -39,13 +39,11 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
private static final int DEFAULT_RECORD_BUFFER_SIZE = 10000;
private static final int DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT = 5000;
private static final int DEFAULT_RECORD_BUFFER_FULL_WAIT = 5000;
- private static final int DEFAULT_FETCH_SEQUENCE_NUMBER_TIMEOUT = 20000;
private static final int DEFAULT_MAX_RECORDS_PER_POLL = 100;
private final int recordBufferSize;
private final int recordBufferOfferTimeout;
private final int recordBufferFullWait;
- private final int fetchSequenceNumberTimeout;
private final Integer fetchThreads;
private final int maxRecordsPerPoll;
@@ -69,7 +67,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
@JsonProperty("recordBufferSize") Integer recordBufferSize,
@JsonProperty("recordBufferOfferTimeout") Integer
recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
- @JsonProperty("fetchSequenceNumberTimeout") Integer
fetchSequenceNumberTimeout,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@@ -106,8 +103,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
: recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait == null ?
DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
- this.fetchSequenceNumberTimeout = fetchSequenceNumberTimeout
- == null ?
DEFAULT_FETCH_SEQUENCE_NUMBER_TIMEOUT : fetchSequenceNumberTimeout;
this.fetchThreads = fetchThreads; // we handle this being null later
this.maxRecordsPerPoll = maxRecordsPerPoll == null ?
DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
@@ -135,12 +130,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
return recordBufferFullWait;
}
- @JsonProperty
- public int getFetchSequenceNumberTimeout()
- {
- return fetchSequenceNumberTimeout;
- }
-
@JsonProperty
public Integer getFetchThreads()
{
@@ -175,7 +164,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
getRecordBufferSize(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
- getFetchSequenceNumberTimeout(),
getFetchThreads(),
getSegmentWriteOutMediumFactory(),
isLogParseExceptions(),
@@ -202,7 +190,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
return recordBufferSize == that.recordBufferSize &&
recordBufferOfferTimeout == that.recordBufferOfferTimeout &&
recordBufferFullWait == that.recordBufferFullWait &&
- fetchSequenceNumberTimeout == that.fetchSequenceNumberTimeout &&
maxRecordsPerPoll == that.maxRecordsPerPoll &&
Objects.equals(fetchThreads, that.fetchThreads);
}
@@ -215,7 +202,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
recordBufferSize,
recordBufferOfferTimeout,
recordBufferFullWait,
- fetchSequenceNumberTimeout,
fetchThreads,
maxRecordsPerPoll
);
@@ -241,7 +227,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
", recordBufferSize=" + recordBufferSize +
", recordBufferOfferTimeout=" + recordBufferOfferTimeout +
", recordBufferFullWait=" + recordBufferFullWait +
- ", fetchSequenceNumberTimeout=" + fetchSequenceNumberTimeout +
", fetchThreads=" + fetchThreads +
", segmentWriteOutMediumFactory=" +
getSegmentWriteOutMediumFactory() +
", logParseExceptions=" + isLogParseExceptions() +
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index b89e89b726..be3fe419ab 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -53,6 +53,7 @@ import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
@@ -400,7 +401,6 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
private final boolean deaggregate;
private final int recordBufferOfferTimeout;
private final int recordBufferFullWait;
- private final int fetchSequenceNumberTimeout;
private final int maxRecordsPerPoll;
private final int fetchThreads;
private final int recordBufferSize;
@@ -426,7 +426,6 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
int recordBufferSize,
int recordBufferOfferTimeout,
int recordBufferFullWait,
- int fetchSequenceNumberTimeout,
int maxRecordsPerPoll,
boolean useEarliestSequenceNumber,
boolean useListShards
@@ -439,7 +438,6 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
this.deaggregate = deaggregate;
this.recordBufferOfferTimeout = recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait;
- this.fetchSequenceNumberTimeout = fetchSequenceNumberTimeout;
this.maxRecordsPerPoll = maxRecordsPerPoll;
this.fetchThreads = fetchThreads;
this.recordBufferSize = recordBufferSize;
@@ -596,7 +594,13 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
public void seek(StreamPartition<String> partition, String sequenceNumber)
throws InterruptedException
{
filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition));
- partitionSeek(partition, sequenceNumber,
ShardIteratorType.AT_SEQUENCE_NUMBER);
+ if (KinesisSequenceNumber.UNREAD_TRIM_HORIZON.equals(sequenceNumber)) {
+ partitionSeek(partition, null, ShardIteratorType.TRIM_HORIZON);
+ } else if (KinesisSequenceNumber.UNREAD_LATEST.equals(sequenceNumber)) {
+ partitionSeek(partition, null, ShardIteratorType.LATEST);
+ } else {
+ partitionSeek(partition, sequenceNumber,
ShardIteratorType.AT_SEQUENCE_NUMBER);
+ }
}
@Override
@@ -666,6 +670,75 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON);
}
+ @Override
+ public boolean isOffsetAvailable(StreamPartition<String> partition,
OrderedSequenceNumber<String> offset)
+ {
+ return wrapExceptions(() -> {
+ KinesisSequenceNumber kinesisSequence = (KinesisSequenceNumber) offset;
+ // No records have been read from the stream and any record is valid
+ if (kinesisSequence.isUnread()) {
+ return true;
+ }
+ // Any other custom sequence number
+ if
(!KinesisSequenceNumber.isValidAWSKinesisSequence(kinesisSequence.get())) {
+ return false;
+ }
+ // The first record using AT_SEQUENCE_NUMBER should match the offset
+ // Should not return empty records provided the record is present
+ // Reference:
https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html
+ // Section: GetRecords Returns Empty Records Array Even When There is
Data in the Stream
+ String shardIterator = RetryUtils.retry(
+ () -> kinesis.getShardIterator(partition.getStream(),
+ partition.getPartitionId(),
+
ShardIteratorType.AT_SEQUENCE_NUMBER.name(),
+ kinesisSequence.get())
+ .getShardIterator(),
+ (throwable) -> {
+ if (throwable instanceof ProvisionedThroughputExceededException) {
+ log.warn(
+ throwable,
+ "encountered ProvisionedThroughputExceededException while
fetching records, this means "
+ + "that the request rate for the stream is too high, or the
requested data is too large for "
+ + "the available throughput. Reduce the frequency or size of
your requests. Consider increasing "
+ + "the number of shards to increase throughput."
+ );
+ return true;
+ }
+ if (throwable instanceof AmazonClientException) {
+ AmazonClientException ase = (AmazonClientException) throwable;
+ return AWSClientUtil.isClientExceptionRecoverable(ase);
+ }
+ return false;
+ },
+ GET_SEQUENCE_NUMBER_RETRY_COUNT
+ );
+ GetRecordsRequest getRecordsRequest = new
GetRecordsRequest().withShardIterator(shardIterator);
+ List<Record> records = RetryUtils.retry(
+ () -> kinesis.getRecords(getRecordsRequest)
+ .getRecords(),
+ (throwable) -> {
+ if (throwable instanceof ProvisionedThroughputExceededException) {
+ log.warn(
+ throwable,
+ "encountered ProvisionedThroughputExceededException while
fetching records, this means "
+ + "that the request rate for the stream is too high, or the
requested data is too large for "
+ + "the available throughput. Reduce the frequency or size of
your requests. Consider increasing "
+ + "the number of shards to increase throughput."
+ );
+ return true;
+ }
+ if (throwable instanceof AmazonClientException) {
+ AmazonClientException ase = (AmazonClientException) throwable;
+ return AWSClientUtil.isClientExceptionRecoverable(ase);
+ }
+ return false;
+ },
+ GET_SEQUENCE_NUMBER_RETRY_COUNT
+ );
+ return !records.isEmpty() &&
records.get(0).getSequenceNumber().equals(kinesisSequence.get());
+ });
+ }
+
public Set<Shard> getShards(String stream)
{
if (useListShards) {
@@ -743,11 +816,12 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
{
Map<String, Long> partitionLag =
Maps.newHashMapWithExpectedSize(currentOffsets.size());
for (Map.Entry<String, String> partitionOffset :
currentOffsets.entrySet()) {
+ StreamPartition<String> partition = new StreamPartition<>(stream,
partitionOffset.getKey());
+ long currentLag = 0L;
if
(KinesisSequenceNumber.isValidAWSKinesisSequence(partitionOffset.getValue())) {
- StreamPartition<String> partition = new StreamPartition<>(stream,
partitionOffset.getKey());
- long currentLag = getPartitionTimeLag(partition,
partitionOffset.getValue());
- partitionLag.put(partitionOffset.getKey(), currentLag);
+ currentLag = getPartitionTimeLag(partition,
partitionOffset.getValue());
}
+ partitionLag.put(partitionOffset.getKey(), currentLag);
}
return partitionLag;
}
@@ -832,6 +906,11 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
* {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first
sequence number from the result set.
* This method is thread safe as it does not depend on the internal state of
the supplier (it doesn't use the
* {@link PartitionResource} which have been assigned to the supplier), and
the Kinesis client is thread safe.
+ *
+ * When there are no records at the offset corresponding to the
ShardIteratorType,
+ * If shard is closed, return custom EOS sequence marker
+ * While getting the earliest sequence number, return a custom marker
corresponding to TRIM_HORIZON
+ * While getting the most recent sequence number, return a custom marker
corresponding to LATEST
*/
@Nullable
private String getSequenceNumber(StreamPartition<String> partition,
ShardIteratorType iteratorEnum)
@@ -840,62 +919,60 @@ public class KinesisRecordSupplier implements
RecordSupplier<String, String, Byt
String shardIterator =
kinesis.getShardIterator(partition.getStream(),
partition.getPartitionId(), iteratorEnum.toString())
.getShardIterator();
- long timeoutMillis = System.currentTimeMillis() +
fetchSequenceNumberTimeout;
- GetRecordsResult recordsResult = null;
-
- while (shardIterator != null && System.currentTimeMillis() <
timeoutMillis) {
-
- if (closed) {
- log.info("KinesisRecordSupplier closed while fetching
sequenceNumber");
- return null;
- }
- final String currentShardIterator = shardIterator;
- final GetRecordsRequest request = new
GetRecordsRequest().withShardIterator(currentShardIterator)
-
.withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT);
- recordsResult = RetryUtils.retry(
- () -> kinesis.getRecords(request),
- (throwable) -> {
- if (throwable instanceof ProvisionedThroughputExceededException)
{
- log.warn(
- throwable,
- "encountered ProvisionedThroughputExceededException while
fetching records, this means "
- + "that the request rate for the stream is too high, or
the requested data is too large for "
- + "the available throughput. Reduce the frequency or size
of your requests. Consider increasing "
- + "the number of shards to increase throughput."
- );
- return true;
- }
- if (throwable instanceof AmazonClientException) {
- AmazonClientException ase = (AmazonClientException) throwable;
- return AWSClientUtil.isClientExceptionRecoverable(ase);
- }
- return false;
- },
- GET_SEQUENCE_NUMBER_RETRY_COUNT
- );
- List<Record> records = recordsResult.getRecords();
+ if (closed) {
+ log.info("KinesisRecordSupplier closed while fetching sequenceNumber");
+ return null;
+ }
+ final GetRecordsRequest request = new
GetRecordsRequest().withShardIterator(shardIterator)
+
.withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT);
+ GetRecordsResult recordsResult = RetryUtils.retry(
+ () -> kinesis.getRecords(request),
+ (throwable) -> {
+ if (throwable instanceof ProvisionedThroughputExceededException) {
+ log.warn(
+ throwable,
+ "encountered ProvisionedThroughputExceededException while
fetching records, this means "
+ + "that the request rate for the stream is too high, or the
requested data is too large for "
+ + "the available throughput. Reduce the frequency or size of
your requests. Consider increasing "
+ + "the number of shards to increase throughput."
+ );
+ return true;
+ }
+ if (throwable instanceof AmazonClientException) {
+ AmazonClientException ase = (AmazonClientException) throwable;
+ return AWSClientUtil.isClientExceptionRecoverable(ase);
+ }
+ return false;
+ },
+ GET_SEQUENCE_NUMBER_RETRY_COUNT
+ );
- if (!records.isEmpty()) {
- return records.get(0).getSequenceNumber();
- }
+ List<Record> records = recordsResult.getRecords();
- shardIterator = recordsResult.getNextShardIterator();
+ if (!records.isEmpty()) {
+ return records.get(0).getSequenceNumber();
}
- if (shardIterator == null) {
- log.info("Partition[%s] returned a null shard iterator, is the shard
closed?", partition.getPartitionId());
+ if (recordsResult.getNextShardIterator() == null) {
+ log.info("Partition[%s] is closed and empty",
partition.getPartitionId());
return KinesisSequenceNumber.END_OF_SHARD_MARKER;
}
+ if (iteratorEnum.equals(ShardIteratorType.LATEST)) {
+ log.info("Partition[%s] has no records at LATEST offset",
partition.getPartitionId());
+ return KinesisSequenceNumber.UNREAD_LATEST;
+ }
- // if we reach here, it usually means either the shard has no more
records, or records have not been
- // added to this shard
- log.warn(
- "timed out while trying to fetch position for shard[%s],
millisBehindLatest is [%s], likely no more records in shard",
- partition.getPartitionId(),
- recordsResult != null ? recordsResult.getMillisBehindLatest() :
"UNKNOWN"
- );
+ // Even if there are records in the shard, they may not be returned on
the first call to getRecords with TRIM_HORIZON
+ // Reference:
https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html
+ // Section: GetRecords Returns Empty Records Array Even When There is
Data in the Stream
+ if (iteratorEnum.equals(ShardIteratorType.TRIM_HORIZON)) {
+ log.info("Partition[%s] has no records at TRIM_HORIZON offset",
partition.getPartitionId());
+ return KinesisSequenceNumber.UNREAD_TRIM_HORIZON;
+ }
+
+ log.warn("Could not fetch sequence number for Partition[%s]",
partition.getPartitionId());
return null;
});
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
index 278256553c..b4a84ef105 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -70,7 +70,6 @@ public class KinesisSamplerSpec extends
SeekableStreamSamplerSpec
tuningConfig.getRecordBufferSize(),
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
- tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber(),
tuningConfig.isUseListShards()
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
index 46b6b7385b..ce5025238e 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java
@@ -51,6 +51,16 @@ public class KinesisSequenceNumber extends
OrderedSequenceNumber<String>
*/
public static final String EXPIRED_MARKER = "EXPIRED";
+ /**
+ * Indicates that records have not been read from a shard which needs to be
processed from sequence type: TRIM_HORIZON
+ */
+ public static final String UNREAD_TRIM_HORIZON = "UNREAD_TRIM_HORIZON";
+
+ /**
+ * Indicates that records have not been read from a shard which needs to be
processed from sequence type: LATEST
+ */
+ public static final String UNREAD_LATEST = "UNREAD_LATEST";
+
/**
* this flag is used to indicate either END_OF_SHARD_MARKER,
NO_END_SEQUENCE_NUMBER
* or EXPIRED_MARKER so that they can be properly compared
@@ -62,14 +72,12 @@ public class KinesisSequenceNumber extends
OrderedSequenceNumber<String>
private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive)
{
super(sequenceNumber, isExclusive);
- if (END_OF_SHARD_MARKER.equals(sequenceNumber)
- || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
- || EXPIRED_MARKER.equals(sequenceNumber)) {
- isMaxSequenceNumber = true;
+ if (!isValidAWSKinesisSequence(sequenceNumber)) {
+ isMaxSequenceNumber = !isUnreadSequence(sequenceNumber);
this.intSequence = null;
} else {
- isMaxSequenceNumber = false;
this.intSequence = new BigInteger(sequenceNumber);
+ this.isMaxSequenceNumber = false;
}
}
@@ -93,6 +101,8 @@ public class KinesisSequenceNumber extends
OrderedSequenceNumber<String>
return !(END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
|| EXPIRED_MARKER.equals(sequenceNumber)
+ || UNREAD_TRIM_HORIZON.equals(sequenceNumber)
+ || UNREAD_LATEST.equals(sequenceNumber)
);
}
@@ -100,14 +110,50 @@ public class KinesisSequenceNumber extends
OrderedSequenceNumber<String>
public int compareTo(OrderedSequenceNumber<String> o)
{
KinesisSequenceNumber num = (KinesisSequenceNumber) o;
+ if (isUnread() && num.isUnread()) {
+ return 0;
+ } else if (isUnread()) {
+ return -1;
+ } else if (num.isUnread()) {
+ return 1;
+ }
if (isMaxSequenceNumber && num.isMaxSequenceNumber) {
return 0;
} else if (isMaxSequenceNumber) {
return 1;
} else if (num.isMaxSequenceNumber) {
return -1;
- } else {
- return this.intSequence.compareTo(new BigInteger(o.get()));
}
+ return this.intSequence.compareTo(new BigInteger(o.get()));
+ }
+
+ @Override
+ public boolean isAvailableWithEarliest(OrderedSequenceNumber<String>
earliest)
+ {
+ if (isUnread()) {
+ return true;
+ }
+ return super.isAvailableWithEarliest(earliest);
+ }
+
+ @Override
+ public boolean isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<String>
end, boolean isEndOffsetExclusive)
+ {
+ // Kinesis sequence number checks are exclusive for AWS numeric sequences
+ // However, If a record is UNREAD and the end offset is finalized to be
UNREAD, we have caught up. (inclusive)
+ if (isUnreadSequence(end.get())) {
+ return false;
+ }
+ return super.isMoreToReadBeforeReadingRecord(end, isEndOffsetExclusive);
+ }
+
+ public boolean isUnread()
+ {
+ return isUnreadSequence(get());
+ }
+
+ private boolean isUnreadSequence(String sequence)
+ {
+ return UNREAD_TRIM_HORIZON.equals(sequence) ||
UNREAD_LATEST.equals(sequence);
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index e8ee5a46c2..e506893034 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -212,7 +212,6 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
taskTuningConfig.getRecordBufferSize(),
taskTuningConfig.getRecordBufferOfferTimeout(),
taskTuningConfig.getRecordBufferFullWait(),
- taskTuningConfig.getFetchSequenceNumberTimeout(),
taskTuningConfig.getMaxRecordsPerPoll(),
ioConfig.isUseEarliestSequenceNumber(),
spec.getSpec().getTuningConfig().isUseListShards()
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 821e8c4687..57bae9f2a8 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -80,7 +80,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
null,
null,
null,
- null,
null
);
}
@@ -110,7 +109,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
@JsonProperty("recordBufferSize") Integer recordBufferSize,
@JsonProperty("recordBufferOfferTimeout") Integer
recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
- @JsonProperty("fetchSequenceNumberTimeout") Integer
fetchSequenceNumberTimeout,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@@ -142,7 +140,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
recordBufferSize,
recordBufferOfferTimeout,
recordBufferFullWait,
- fetchSequenceNumberTimeout,
fetchThreads,
segmentWriteOutMediumFactory,
logParseExceptions,
@@ -257,7 +254,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
", recordBufferSize=" + getRecordBufferSize() +
", recordBufferOfferTimeout=" + getRecordBufferOfferTimeout() +
", recordBufferFullWait=" + getRecordBufferFullWait() +
- ", fetchSequenceNumberTimeout=" + getFetchSequenceNumberTimeout() +
", fetchThreads=" + getFetchThreads() +
", segmentWriteOutMediumFactory=" +
getSegmentWriteOutMediumFactory() +
", logParseExceptions=" + isLogParseExceptions() +
@@ -293,7 +289,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
getRecordBufferSize(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
- getFetchSequenceNumberTimeout(),
getFetchThreads(),
getSegmentWriteOutMediumFactory(),
isLogParseExceptions(),
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 6a13ae003d..9d9b3dd417 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -72,7 +72,6 @@ public class KinesisIndexTaskSerdeTest
null,
null,
null,
- null,
null
);
private static final KinesisIndexTaskIOConfig IO_CONFIG = new
KinesisIndexTaskIOConfig(
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 672dae3fd9..03b5be1bef 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2941,7 +2941,6 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
null,
null,
null,
- null,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 18437962dc..9aa1f6127b 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -79,7 +79,6 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(10000, config.getRecordBufferSize());
Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
Assert.assertEquals(5000, config.getRecordBufferFullWait());
- Assert.assertEquals(20000, config.getFetchSequenceNumberTimeout());
Assert.assertNull(config.getFetchThreads());
Assert.assertFalse(config.isSkipSequenceNumberAvailabilityCheck());
Assert.assertFalse(config.isResetOffsetAutomatically());
@@ -100,7 +99,6 @@ public class KinesisIndexTaskTuningConfigTest
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"recordBufferFullWait\": 500,\n"
- + " \"fetchSequenceNumberTimeout\": 6000,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"skipSequenceNumberAvailabilityCheck\": true,\n"
+ " \"fetchThreads\": 2,\n"
@@ -128,7 +126,6 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(1000, config.getRecordBufferSize());
Assert.assertEquals(500, config.getRecordBufferOfferTimeout());
Assert.assertEquals(500, config.getRecordBufferFullWait());
- Assert.assertEquals(6000, config.getFetchSequenceNumberTimeout());
Assert.assertEquals(2, (int) config.getFetchThreads());
Assert.assertTrue(config.isSkipSequenceNumberAvailabilityCheck());
Assert.assertFalse(config.isResetOffsetAutomatically());
@@ -156,7 +153,6 @@ public class KinesisIndexTaskTuningConfigTest
1000,
1000,
500,
- null,
42,
null,
false,
@@ -216,7 +212,6 @@ public class KinesisIndexTaskTuningConfigTest
1000,
1000,
500,
- null,
42,
null,
false,
@@ -268,7 +263,6 @@ public class KinesisIndexTaskTuningConfigTest
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"recordBufferFullWait\": 500,\n"
- + " \"fetchSequenceNumberTimeout\": 6000,\n"
+ " \"resetOffsetAutomatically\": true,\n"
+ " \"skipSequenceNumberAvailabilityCheck\": true,\n"
+ " \"fetchThreads\": 2\n"
@@ -309,7 +303,6 @@ public class KinesisIndexTaskTuningConfigTest
1000,
500,
500,
- 6000,
2,
null,
null,
@@ -337,7 +330,6 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(1000, copy.getRecordBufferSize());
Assert.assertEquals(500, copy.getRecordBufferOfferTimeout());
Assert.assertEquals(500, copy.getRecordBufferFullWait());
- Assert.assertEquals(6000, copy.getFetchSequenceNumberTimeout());
Assert.assertEquals(2, (int) copy.getFetchThreads());
Assert.assertFalse(copy.isSkipSequenceNumberAvailabilityCheck());
Assert.assertTrue(copy.isResetOffsetAutomatically());
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index bc58c1ff84..07ac822524 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -64,6 +64,8 @@ import java.util.stream.Collectors;
import static
org.apache.druid.indexing.kinesis.KinesisSequenceNumber.END_OF_SHARD_MARKER;
import static
org.apache.druid.indexing.kinesis.KinesisSequenceNumber.EXPIRED_MARKER;
import static
org.apache.druid.indexing.kinesis.KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER;
+import static
org.apache.druid.indexing.kinesis.KinesisSequenceNumber.UNREAD_LATEST;
+import static
org.apache.druid.indexing.kinesis.KinesisSequenceNumber.UNREAD_TRIM_HORIZON;
public class KinesisRecordSupplierTest extends EasyMockSupport
{
@@ -225,7 +227,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
5,
true,
false
@@ -285,7 +286,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
5,
true,
true
@@ -380,7 +380,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
100,
true,
false
@@ -469,7 +468,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
100,
true,
false
@@ -531,7 +529,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
100,
true,
false
@@ -611,7 +608,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
100,
true,
false
@@ -680,7 +676,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
100,
true,
false
@@ -716,7 +711,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
5,
true,
false
@@ -780,7 +774,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
1,
true,
false
@@ -874,7 +867,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
100,
true,
false
@@ -899,20 +891,22 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
}
@Test
- public void getLatestSequenceNumberWhenShardIsEmptyShouldReturnsNull()
+ public void getLatestSequenceNumberWhenShardIsEmptyShouldReturnUnreadToken()
{
- KinesisRecordSupplier recordSupplier =
getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper();
-
Assert.assertNull(recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM,
SHARD_ID0)));
+ KinesisRecordSupplier recordSupplier =
getSequenceNumberWhenNoRecordsHelperForOpenShard();
+ Assert.assertEquals(KinesisSequenceNumber.UNREAD_LATEST,
+
recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
verifyAll();
}
@Test
- public void getEarliestSequenceNumberWhenShardIsEmptyShouldReturnsNull()
+ public void
getEarliestSequenceNumberWhenShardIsEmptyShouldReturnUnreadToken()
{
- KinesisRecordSupplier recordSupplier =
getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper();
-
Assert.assertNull(recordSupplier.getEarliestSequenceNumber(StreamPartition.of(STREAM,
SHARD_ID0)));
+ KinesisRecordSupplier recordSupplier =
getSequenceNumberWhenNoRecordsHelperForOpenShard();
+ Assert.assertEquals(KinesisSequenceNumber.UNREAD_TRIM_HORIZON,
+
recordSupplier.getEarliestSequenceNumber(StreamPartition.of(STREAM,
SHARD_ID0)));
verifyAll();
}
@@ -949,7 +943,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 1000,
100,
true,
false
@@ -958,24 +951,23 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals("0",
recordSupplier.getLatestSequenceNumber(StreamPartition.of(STREAM, SHARD_ID0)));
}
- private KinesisRecordSupplier
getSequenceNumberWhenShardIsEmptyShouldReturnsNullHelper()
+ private KinesisRecordSupplier
getSequenceNumberWhenNoRecordsHelperForOpenShard()
{
EasyMock.expect(kinesis.getShardIterator(
EasyMock.eq(STREAM),
EasyMock.eq(SHARD_ID0),
EasyMock.anyString()
)).andReturn(
- getShardIteratorResult0).anyTimes();
+ getShardIteratorResult0).times(1);
-
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes();
+
EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).times(1);
EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR,
1000)))
.andReturn(getRecordsResult0)
- .times(1, Integer.MAX_VALUE);
+ .times(1);
-
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(Collections.emptyList()).times(1,
Integer.MAX_VALUE);
-
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(SHARD0_ITERATOR).times(1,
Integer.MAX_VALUE);
-
EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(0L).once();
+
EasyMock.expect(getRecordsResult0.getRecords()).andReturn(Collections.emptyList()).times(1);
+
EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(SHARD0_ITERATOR).times(1);
replayAll();
@@ -988,7 +980,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 1000,
100,
true,
false
@@ -1072,7 +1063,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
100,
true,
false
@@ -1100,12 +1090,19 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertEquals(SHARDS_LAG_MILLIS_EMPTY, independentTimeLag);
// Verify that kinesis apis are not called for custom sequence numbers
- for (String sequenceNum : Arrays.asList(NO_END_SEQUENCE_NUMBER,
END_OF_SHARD_MARKER, EXPIRED_MARKER)) {
+ for (String sequenceNum : Arrays.asList(NO_END_SEQUENCE_NUMBER,
END_OF_SHARD_MARKER, EXPIRED_MARKER,
+ UNREAD_LATEST,
UNREAD_TRIM_HORIZON)) {
offsets = ImmutableMap.of(
SHARD_ID1, sequenceNum,
SHARD_ID0, sequenceNum
);
- Assert.assertEquals(Collections.emptyMap(),
recordSupplier.getPartitionsTimeLag(STREAM, offsets));
+
+ Map<String, Long> zeroOffsets = ImmutableMap.of(
+ SHARD_ID1, 0L,
+ SHARD_ID0, 0L
+ );
+
+ Assert.assertEquals(zeroOffsets,
recordSupplier.getPartitionsTimeLag(STREAM, offsets));
}
verifyAll();
}
@@ -1122,7 +1119,6 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
100,
5000,
5000,
- 60000,
5,
true,
false
@@ -1150,16 +1146,80 @@ public class KinesisRecordSupplierTest extends
EasyMockSupport
Assert.assertFalse(target.isClosedShardEmpty(STREAM,
shardWithRecordsAndNonNullNextIterator));
}
+ @Test
+ public void testIsOffsetAvailable()
+ {
+ AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+ KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+ recordsPerFetch,
+ 0,
+ 2,
+ false,
+ 100,
+ 5000,
+ 5000,
+ 5,
+ true,
+ false
+ );
+ StreamPartition<String> partition = new StreamPartition<>(STREAM,
SHARD_ID0);
+
+ setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, "-1",
+ Collections.emptyList(), "whatever");
+
+ Record record0 = new Record().withSequenceNumber("5");
+ setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, "0",
+ Collections.singletonList(record0), "whatever");
+
+ Record record10 = new Record().withSequenceNumber("10");
+ setupMockKinesisForShardId(mockKinesis, SHARD_ID0, null,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, "10",
+ Collections.singletonList(record10),
"whatever");
+
+ EasyMock.replay(mockKinesis);
+
+ Assert.assertTrue(target.isOffsetAvailable(partition,
KinesisSequenceNumber.of(UNREAD_TRIM_HORIZON)));
+
+ Assert.assertFalse(target.isOffsetAvailable(partition,
KinesisSequenceNumber.of(END_OF_SHARD_MARKER)));
+
+ Assert.assertFalse(target.isOffsetAvailable(partition,
KinesisSequenceNumber.of("-1")));
+
+ Assert.assertFalse(target.isOffsetAvailable(partition,
KinesisSequenceNumber.of("0")));
+
+ Assert.assertTrue(target.isOffsetAvailable(partition,
KinesisSequenceNumber.of("10")));
+ }
+
private void setupMockKinesisForShardId(AmazonKinesis kinesis, String
shardId,
- List<Record> expectedRecords, String
expectedNextIterator)
+ List<Record> records, String
nextIterator)
{
- String shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString();
+ setupMockKinesisForShardId(kinesis, shardId, 1,
ShardIteratorType.TRIM_HORIZON, null, records, nextIterator);
+ }
+
+ private void setupMockKinesisForShardId(AmazonKinesis kinesis, String
shardId, Integer limit,
+ ShardIteratorType iteratorType,
String sequenceNumber,
+ List<Record> records, String
nextIterator)
+ {
+ String shardIteratorType = iteratorType.toString();
String shardIterator = "shardIterator" + shardId;
+ if (sequenceNumber != null) {
+ shardIterator += sequenceNumber;
+ }
GetShardIteratorResult shardIteratorResult = new
GetShardIteratorResult().withShardIterator(shardIterator);
- EasyMock.expect(kinesis.getShardIterator(STREAM, shardId,
shardIteratorType)).andReturn(shardIteratorResult).once();
- GetRecordsRequest request = new
GetRecordsRequest().withShardIterator(shardIterator).withLimit(1);
- GetRecordsResult result = new
GetRecordsResult().withRecords(expectedRecords)
-
.withNextShardIterator(expectedNextIterator);
+ if (sequenceNumber == null) {
+ EasyMock.expect(kinesis.getShardIterator(STREAM, shardId,
shardIteratorType))
+ .andReturn(shardIteratorResult)
+ .once();
+ } else {
+ EasyMock.expect(kinesis.getShardIterator(STREAM, shardId,
shardIteratorType, sequenceNumber))
+ .andReturn(shardIteratorResult)
+ .once();
+ }
+ GetRecordsRequest request = new
GetRecordsRequest().withShardIterator(shardIterator)
+ .withLimit(limit);
+ GetRecordsResult result = new GetRecordsResult().withRecords(records)
+
.withNextShardIterator(nextIterator);
EasyMock.expect(kinesis.getRecords(request)).andReturn(result);
}
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 8112de3c90..cf3e9d361e 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -199,7 +199,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- 5000,
null,
null,
null,
@@ -831,6 +830,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(true)
+ .anyTimes();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -885,6 +887,15 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
+ // Any sequence number greater than or equal to 0 must be available
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
+
EasyMock.eq(KinesisSequenceNumber.of("101"))))
+ .andReturn(true)
+ .anyTimes();
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
+
EasyMock.eq(KinesisSequenceNumber.of("-1"))))
+ .andReturn(false)
+ .anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
@@ -2758,6 +2769,19 @@ public class KinesisSupervisorTest extends
EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("400").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
+ // Only sequence numbers >= 300 are available
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
+
EasyMock.eq(KinesisSequenceNumber.of("400"))))
+ .andReturn(true)
+ .anyTimes();
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
+
EasyMock.eq(KinesisSequenceNumber.of("200"))))
+ .andReturn(false)
+ .anyTimes();
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
+
EasyMock.eq(KinesisSequenceNumber.of("100"))))
+ .andReturn(false)
+ .anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3938,7 +3962,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- 5000,
null,
null,
null,
@@ -4073,6 +4096,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(true)
+ .anyTimes();
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -4356,6 +4382,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(true)
+ .anyTimes();
Capture<Task> postSplitCaptured = Capture.newInstance(CaptureType.ALL);
@@ -4789,6 +4818,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(new
StreamPartition<>(STREAM, SHARD_ID2)))
.andReturn("200").anyTimes();
+
EasyMock.expect(supervisorRecordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(true)
+ .anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
@@ -5043,7 +5075,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- 5000,
null,
null,
null,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index ed3a20cf20..c6ce08ab2a 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -56,7 +56,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
@JsonProperty("recordBufferSize") Integer recordBufferSize,
@JsonProperty("recordBufferOfferTimeout") Integer
recordBufferOfferTimeout,
@JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
- @JsonProperty("fetchSequenceNumberTimeout") Integer
fetchSequenceNumberTimeout,
@JsonProperty("fetchThreads") Integer fetchThreads,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@@ -86,7 +85,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
recordBufferSize,
recordBufferOfferTimeout,
recordBufferFullWait,
- fetchSequenceNumberTimeout,
fetchThreads,
segmentWriteOutMediumFactory,
logParseExceptions,
@@ -119,7 +117,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig
extends KinesisIndexTaskTu
base.getRecordBufferSize(),
base.getRecordBufferOfferTimeout(),
base.getRecordBufferFullWait(),
- base.getFetchSequenceNumberTimeout(),
base.getFetchThreads(),
base.getSegmentWriteOutMediumFactory(),
base.isLogParseExceptions(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index dbc2857336..cc6d8e61e6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1253,10 +1253,10 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final SequenceOffsetType endSequenceNumber
)
{
- final int compareToEnd = createSequenceNumber(recordSequenceNumber)
- .compareTo(createSequenceNumber(endSequenceNumber));
-
- return isEndOffsetExclusive() ? compareToEnd < 0 : compareToEnd <= 0;
+ return
createSequenceNumber(recordSequenceNumber).isMoreToReadBeforeReadingRecord(
+ createSequenceNumber(endSequenceNumber),
+ isEndOffsetExclusive()
+ );
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
index 74fd08d445..cbc61edbe6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java
@@ -81,4 +81,24 @@ public abstract class
OrderedSequenceNumber<SequenceOffsetType>
", isExclusive=" + isExclusive +
'}';
}
+
+ public boolean
isAvailableWithEarliest(OrderedSequenceNumber<SequenceOffsetType> earliest)
+ {
+ return earliest.compareTo(this) <= 0;
+ }
+
+ /**
+ * Returns true if, given that we want to start reading from this sequence
number and stop at the sequence number end,
+ * there is more left to read. Used in pre-read checks to determine if there
is anything left to read.
+ *
+ * @param end the end offset of the partition for a given task
+ * @param isEndOffsetExclusive indicates if the TaskRunner considers the end
offsets to be exclusive
+ * @return true if more records need to be read given that this is the
current sequence number
+ */
+ public boolean
isMoreToReadBeforeReadingRecord(OrderedSequenceNumber<SequenceOffsetType> end,
+ boolean isEndOffsetExclusive)
+ {
+ final int compareToEnd = this.compareTo(end);
+ return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index 7487892b49..df2b5c943b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -105,6 +105,14 @@ public interface RecordSupplier<PartitionIdType,
SequenceOffsetType, RecordType
@Nullable
SequenceOffsetType
getEarliestSequenceNumber(StreamPartition<PartitionIdType> partition);
+ /**
+ * Checks if a provided offset is still available for a given partition in
the stream
+ * @param partition stream partition to check in
+ * @param offset offset to be checked
+ * @return availability of offset
+ */
+ boolean isOffsetAvailable(StreamPartition<PartitionIdType> partition,
+ OrderedSequenceNumber<SequenceOffsetType> offset);
/**
* returns the sequence number of the next record
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index e4445f20a2..59526028a8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3916,9 +3916,19 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@NotNull SequenceOffsetType offsetFromMetadata
)
{
- final SequenceOffsetType earliestOffset =
getOffsetFromStreamForPartition(partition, true);
- return earliestOffset != null
- &&
makeSequenceNumber(earliestOffset).compareTo(makeSequenceNumber(offsetFromMetadata))
<= 0;
+ StreamPartition<PartitionIdType> streamPartition =
StreamPartition.of(ioConfig.getStream(), partition);
+ OrderedSequenceNumber<SequenceOffsetType> sequenceNumber =
makeSequenceNumber(offsetFromMetadata);
+ recordSupplierLock.lock();
+ if (!recordSupplier.getAssignment().contains(streamPartition)) {
+ // this shouldn't happen, but in case it does...
+ throw new IllegalStateException("Record supplier does not match current
known partitions");
+ }
+ try {
+ return recordSupplier.isOffsetAvailable(streamPartition, sequenceNumber);
+ }
+ finally {
+ recordSupplierLock.unlock();
+ }
}
protected void emitNoticeProcessTime(String noticeType, long timeInMillis)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index 89a4642d09..8244f3dcc4 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.seekablestream.RecordSupplierInputSource;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -1601,6 +1602,12 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
return null;
}
+ @Override
+ public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Long> offset)
+ {
+ return true;
+ }
+
@Override
public Long getPosition(StreamPartition<Integer> partition)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
index 036f230532..eb3a083527 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.DateTimes;
@@ -214,6 +215,12 @@ public class RecordSupplierInputSourceTest extends
InitializedNullHandlingTest
throw new UnsupportedOperationException();
}
+ @Override
+ public boolean isOffsetAvailable(StreamPartition<Integer> partition,
OrderedSequenceNumber<Integer> offset)
+ {
+ return true;
+ }
+
@Override
public Integer getPosition(StreamPartition<Integer> partition)
{
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
index f0c66fcb8b..f27e016a3c 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
@@ -111,4 +111,9 @@ public class KinesisEventWriter implements StreamEventWriter
"Waiting for all Kinesis writes to be flushed"
);
}
+
+ protected KinesisProducer getKinesisProducer()
+ {
+ return kinesisProducer;
+ }
}
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisSingleShardEventWriter.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisSingleShardEventWriter.java
new file mode 100644
index 0000000000..31f437cc4f
--- /dev/null
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisSingleShardEventWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Writes only to a single shard
+ */
+public class KinesisSingleShardEventWriter extends KinesisEventWriter
+{
+
+ public KinesisSingleShardEventWriter(String endpoint, boolean aggregate)
throws Exception
+ {
+ super(endpoint, aggregate);
+ }
+
+ @Override
+ public void write(String streamName, byte[] event)
+ {
+ getKinesisProducer().addUserRecord(
+ streamName,
+ "0",
+ ByteBuffer.wrap(event)
+ );
+ }
+}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceEmptyShardsTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceEmptyShardsTest.java
new file mode 100644
index 0000000000..3228842ec1
--- /dev/null
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceEmptyShardsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.utils.KinesisSingleShardEventWriter;
+import org.apache.druid.testing.utils.StreamEventWriter;
+
+import javax.annotation.Nullable;
+
+public abstract class AbstractKinesisIndexingServiceEmptyShardsTest extends
AbstractKinesisIndexingServiceTest
+{
+ private static final Logger LOG = new
Logger(AbstractKinesisIndexingServiceEmptyShardsTest.class);
+
+ @Override
+ StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config,
@Nullable Boolean transactionEnabled)
+ throws Exception
+ {
+ if (transactionEnabled != null) {
+ LOG.warn(
+ "Kinesis event writer doesn't support transaction. Ignoring the
given parameter transactionEnabled[%s]",
+ transactionEnabled
+ );
+ }
+ return new KinesisSingleShardEventWriter(config.getStreamEndpoint(),
false);
+ }
+}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceEmptyShardsSerializedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceEmptyShardsSerializedTest.java
new file mode 100644
index 0000000000..ef005b2d5c
--- /dev/null
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceEmptyShardsSerializedTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+/**
+ * Variant of ITKinesisIndexingSericeSerializedTest where event writer
publishes to a single shard
+ * and there may be empty ones as a result
+ */
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceEmptyShardsSerializedTest extends
AbstractKinesisIndexingServiceEmptyShardsTest
+{
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "kinesis_emptyShards_serialized";
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+ /**
+ * This test must be run individually due to their resource consumption
requirement (task slot, memory, etc.)
+ */
+ @Test
+ public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
+ {
+ doTestIndexDataWithStartStopSupervisor(null);
+ }
+
+ /**
+ * This test must be run individually due to their resource consumption
requirement (task slot, memory, etc.)
+ */
+ @Test
+ public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
+ {
+ doTestIndexDataWithStreamReshardSplit(null);
+ }
+
+ /**
+ * This test must be run individually due to their resource consumption
requirement (task slot, memory, etc.)
+ */
+ @Test
+ public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
+ {
+ doTestIndexDataWithStreamReshardMerge();
+ }
+}
diff --git a/web-console/src/druid-models/ingestion-spec.tsx
b/web-console/src/druid-models/ingestion-spec.tsx
index fd7a92a1b4..29881dfe0a 100644
--- a/web-console/src/druid-models/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec.tsx
@@ -1377,7 +1377,6 @@ export interface TuningConfig {
recordBufferSize?: number;
recordBufferOfferTimeout?: number;
recordBufferFullWait?: number;
- fetchSequenceNumberTimeout?: number;
fetchThreads?: number;
}
@@ -2023,21 +2022,6 @@ const TUNING_FORM_FIELDS: Field<IngestionSpec>[] = [
</>
),
},
- {
- name: 'spec.tuningConfig.fetchSequenceNumberTimeout',
- type: 'number',
- defaultValue: 60000,
- defined: typeIs('kinesis'),
- hideInMore: true,
- info: (
- <>
- Length of time in milliseconds to wait for Kinesis to return the
earliest or latest sequence
- number for a shard. Kinesis will not return the latest sequence number
if no data is
- actively being written to that shard. In this case, this fetch call
will repeatedly timeout
- and retry until fresh data is written to the stream.
- </>
- ),
- },
{
name: 'spec.tuningConfig.fetchThreads',
type: 'number',
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]