This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 48a96f5d06b Better automatic offset reset for Kinesis ingestion 
(#15338)
48a96f5d06b is described below

commit 48a96f5d06bf4b4d1532b0ec96eb631b4287e30d
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Dec 13 12:03:17 2023 +0530

    Better automatic offset reset for Kinesis ingestion (#15338)
    
    Better automatic offset reset for Kinesis ingestion
---
 .../indexing/kinesis/KinesisIndexTaskRunner.java   | 49 ++++++++++++++--------
 1 file changed, 31 insertions(+), 18 deletions(-)

diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 22448ba67a0..75f23da0e1f 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -42,12 +42,12 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
 
 public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String, String, ByteEntity>
 {
@@ -125,28 +125,42 @@ public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String
   {
     if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
       final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+      final Map<StreamPartition<String>, String> partitionToSequenceResetMap = 
new HashMap<>();
       for (final StreamPartition<String> streamPartition : assignment) {
         String sequence = currOffsets.get(streamPartition.getPartitionId());
         if (!recordSupplier.isOffsetAvailable(streamPartition, 
KinesisSequenceNumber.of(sequence))) {
-          if (task.getTuningConfig().isResetOffsetAutomatically()) {
-            log.info("Attempting to reset sequences automatically for all 
partitions");
-            try {
-              sendResetRequestAndWait(
-                  assignment.stream()
-                            .collect(Collectors.toMap(x -> x, x -> 
currOffsets.get(x.getPartitionId()))),
-                  toolbox
-              );
-            }
-            catch (IOException e) {
-              throw new ISE(e, "Exception while attempting to automatically 
reset sequences");
-            }
-          } else {
+          partitionToSequenceResetMap.put(streamPartition, sequence);
+        }
+      }
+
+      if (!partitionToSequenceResetMap.isEmpty()) {
+        for (Map.Entry<StreamPartition<String>, String> partitionToSequence : 
partitionToSequenceResetMap.entrySet()) {
+          log.warn("Starting sequenceNumber[%s] is no longer available for 
partition[%s].",
+                   partitionToSequence.getValue(),
+                   partitionToSequence.getKey()
+          );
+        }
+        if (task.getTuningConfig().isResetOffsetAutomatically()) {
+          log.info(
+              "Attempting to reset offsets for [%d] partitions with ids[%s].",
+              partitionToSequenceResetMap.size(),
+              partitionToSequenceResetMap.keySet()
+          );
+          try {
+            sendResetRequestAndWait(partitionToSequenceResetMap, toolbox);
+          }
+          catch (IOException e) {
             throw new ISE(
-                "Starting sequenceNumber [%s] is no longer available for 
partition [%s] and resetOffsetAutomatically is not enabled",
-                sequence,
-                streamPartition.getPartitionId()
+                e,
+                "Exception while attempting to automatically reset sequences 
for partitions[%s]",
+                partitionToSequenceResetMap.keySet()
             );
           }
+        } else {
+          throw new ISE(
+              "Automatic offset reset is disabled, but there are partitions 
with unavailable sequence numbers [%s].",
+              partitionToSequenceResetMap
+          );
         }
       }
     }
@@ -191,5 +205,4 @@ public class KinesisIndexTaskRunner extends 
SeekableStreamIndexTaskRunner<String
       return null;
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to