abhishekagarwal87 commented on code in PR #12792:
URL: https://github.com/apache/druid/pull/12792#discussion_r937333947
##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -300,7 +300,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is
specified, default param
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between
the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for
space to become available in the buffer before timing out.| no (default ==
5000)|
|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the
buffer to drain before attempting to fetch records from Kinesis again.|no
(default == 5000)|
-|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait
for Kinesis to return the earliest or latest sequence number for a shard.
Kinesis will not return the latest sequence number if no data is actively being
written to that shard. In this case, this fetch call will repeatedly timeout
and retry until fresh data is written to the stream.|no (default == 60000)|
Review Comment:
what happens if someone has this parameter in their ingestion spec?
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -128,8 +128,7 @@ protected void possiblyResetDataSourceMetadata(
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
String earliestSequenceNumber =
recordSupplier.getEarliestSequenceNumber(streamPartition);
Review Comment:
can we pass this to the `isOffsetAvailable` method?
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java:
##########
@@ -93,21 +101,59 @@ public static boolean isValidAWSKinesisSequence(String
sequenceNumber)
return !(END_OF_SHARD_MARKER.equals(sequenceNumber)
|| NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
|| EXPIRED_MARKER.equals(sequenceNumber)
+ || UNREAD_TRIM_HORIZON.equals(sequenceNumber)
+ || UNREAD_LATEST.equals(sequenceNumber)
);
}
@Override
public int compareTo(OrderedSequenceNumber<String> o)
{
KinesisSequenceNumber num = (KinesisSequenceNumber) o;
+ if (isUnread() && num.isUnread()) {
Review Comment:
shouldn't we fallback to comparing maxSequenceNumber in this case as well?
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -128,8 +128,7 @@ protected void possiblyResetDataSourceMetadata(
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
String earliestSequenceNumber =
recordSupplier.getEarliestSequenceNumber(streamPartition);
Review Comment:
since its an expensive call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]