This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d595346 Ignore shardIterator type when sequence number is available
(#7592)
d595346 is described below
commit d5953465f0c1a96829b0271b79ae0e55e7a4bf33
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Oct 20 22:12:51 2021 +0530
Ignore shardIterator type when sequence number is available (#7592)
---
.../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7bb7dcd..7f2557e 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -197,12 +197,13 @@ public class KinesisConsumer extends
KinesisConnectionHandler implements Partiti
private String getShardIterator(String shardId, String sequenceNumber) {
GetShardIteratorRequest.Builder requestBuilder =
-
GetShardIteratorRequest.builder().streamName(_streamTopicName).shardId(shardId)
- .shardIteratorType(_shardIteratorType);
+
GetShardIteratorRequest.builder().streamName(_streamTopicName).shardId(shardId);
- if (sequenceNumber != null &&
(_shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER) ||
_shardIteratorType
- .equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER))) {
- requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
+ if (sequenceNumber != null) {
+ requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber)
+ .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+ } else {
+ requestBuilder = requestBuilder.shardIteratorType(_shardIteratorType);
}
return
_kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]