This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d595346  Ignore shardIterator type when sequence number is available 
(#7592)
d595346 is described below

commit d5953465f0c1a96829b0271b79ae0e55e7a4bf33
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Oct 20 22:12:51 2021 +0530

    Ignore shardIterator type when sequence number is available (#7592)
---
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java   | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7bb7dcd..7f2557e 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -197,12 +197,13 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Partiti
 
   private String getShardIterator(String shardId, String sequenceNumber) {
     GetShardIteratorRequest.Builder requestBuilder =
-        
GetShardIteratorRequest.builder().streamName(_streamTopicName).shardId(shardId)
-            .shardIteratorType(_shardIteratorType);
+        
GetShardIteratorRequest.builder().streamName(_streamTopicName).shardId(shardId);
 
-    if (sequenceNumber != null && 
(_shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER) || 
_shardIteratorType
-        .equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER))) {
-      requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
+    if (sequenceNumber != null) {
+      requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber)
+          .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+    } else {
+      requestBuilder = requestBuilder.shardIteratorType(_shardIteratorType);
     }
 
     return 
_kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to