abhishekagarwal87 commented on code in PR #12792:
URL: https://github.com/apache/druid/pull/12792#discussion_r937333947


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -300,7 +300,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is 
specified, default param
 |`recordBufferSize`|Integer|Size of the buffer (number of events) used between 
the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
 |`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for 
space to become available in the buffer before timing out.| no (default == 
5000)|
 |`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the 
buffer to drain before attempting to fetch records from Kinesis again.|no 
(default == 5000)|
-|`fetchSequenceNumberTimeout`|Integer|Length of time in milliseconds to wait 
for Kinesis to return the earliest or latest sequence number for a shard. 
Kinesis will not return the latest sequence number if no data is actively being 
written to that shard. In this case, this fetch call will repeatedly timeout 
and retry until fresh data is written to the stream.|no (default == 60000)|

Review Comment:
   what happens if someone has this parameter in their ingestion spec? 



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -128,8 +128,7 @@ protected void possiblyResetDataSourceMetadata(
       for (final StreamPartition<String> streamPartition : assignment) {
         String sequence = currOffsets.get(streamPartition.getPartitionId());
         String earliestSequenceNumber = 
recordSupplier.getEarliestSequenceNumber(streamPartition);

Review Comment:
   can we pass this to the `isOffsetAvailable` method? 



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java:
##########
@@ -93,21 +101,59 @@ public static boolean isValidAWSKinesisSequence(String 
sequenceNumber)
     return !(END_OF_SHARD_MARKER.equals(sequenceNumber)
              || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)
              || EXPIRED_MARKER.equals(sequenceNumber)
+             || UNREAD_TRIM_HORIZON.equals(sequenceNumber)
+             || UNREAD_LATEST.equals(sequenceNumber)
       );
   }
 
   @Override
   public int compareTo(OrderedSequenceNumber<String> o)
   {
     KinesisSequenceNumber num = (KinesisSequenceNumber) o;
+    if (isUnread() && num.isUnread()) {

Review Comment:
   shouldn't we fallback to comparing maxSequenceNumber in this case as well? 



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -128,8 +128,7 @@ protected void possiblyResetDataSourceMetadata(
       for (final StreamPartition<String> streamPartition : assignment) {
         String sequence = currOffsets.get(streamPartition.getPartitionId());
         String earliestSequenceNumber = 
recordSupplier.getEarliestSequenceNumber(streamPartition);

Review Comment:
   since its an expensive call. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to