abhishekrb19 commented on code in PR #15338:
URL: https://github.com/apache/druid/pull/15338#discussion_r1399648539


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
   {
     if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
       final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+      final Map<StreamPartition<String>, String> shardResetMap = new 
HashMap<>();
       for (final StreamPartition<String> streamPartition : assignment) {
         String sequence = currOffsets.get(streamPartition.getPartitionId());
         if (!recordSupplier.isOffsetAvailable(streamPartition, 
KinesisSequenceNumber.of(sequence))) {
-          if (task.getTuningConfig().isResetOffsetAutomatically()) {
-            log.info("Attempting to reset sequences automatically for all 
partitions");
-            try {
-              sendResetRequestAndWait(
-                  assignment.stream()
-                            .collect(Collectors.toMap(x -> x, x -> 
currOffsets.get(x.getPartitionId()))),
-                  toolbox
-              );
-            }
-            catch (IOException e) {
-              throw new ISE(e, "Exception while attempting to automatically 
reset sequences");
-            }
-          } else {
-            throw new ISE(
-                "Starting sequenceNumber [%s] is no longer available for 
partition [%s] and resetOffsetAutomatically is not enabled",
-                sequence,
-                streamPartition.getPartitionId()
-            );
+          shardResetMap.put(streamPartition, sequence);
+        }
+      }
+
+      if (!shardResetMap.isEmpty()) {
+        for (Map.Entry<StreamPartition<String>, String> partitionToReset : 
shardResetMap.entrySet()) {
+          log.warn("Starting sequence number [%s] is no longer available for 
partition [%s]",
+                   partitionToReset.getValue(),
+                   partitionToReset.getKey().getPartitionId()
+          );
+        }
+        if (task.getTuningConfig().isResetOffsetAutomatically()) {
+          log.info("Attempting to reset offsets for [%d] partitions.", 
shardResetMap.size());
+          try {
+            sendResetRequestAndWait(shardResetMap, toolbox);
+          }
+          catch (IOException e) {
+            throw new ISE(e, "Exception while attempting to automatically 
reset sequences");

Review Comment:
   Same, perhaps include `shardResetMap` keys in the exception?



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
   {
     if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
       final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+      final Map<StreamPartition<String>, String> shardResetMap = new 
HashMap<>();
       for (final StreamPartition<String> streamPartition : assignment) {
         String sequence = currOffsets.get(streamPartition.getPartitionId());
         if (!recordSupplier.isOffsetAvailable(streamPartition, 
KinesisSequenceNumber.of(sequence))) {
-          if (task.getTuningConfig().isResetOffsetAutomatically()) {
-            log.info("Attempting to reset sequences automatically for all 
partitions");
-            try {
-              sendResetRequestAndWait(
-                  assignment.stream()
-                            .collect(Collectors.toMap(x -> x, x -> 
currOffsets.get(x.getPartitionId()))),
-                  toolbox
-              );
-            }
-            catch (IOException e) {
-              throw new ISE(e, "Exception while attempting to automatically 
reset sequences");
-            }
-          } else {
-            throw new ISE(
-                "Starting sequenceNumber [%s] is no longer available for 
partition [%s] and resetOffsetAutomatically is not enabled",
-                sequence,
-                streamPartition.getPartitionId()
-            );
+          shardResetMap.put(streamPartition, sequence);
+        }
+      }
+
+      if (!shardResetMap.isEmpty()) {
+        for (Map.Entry<StreamPartition<String>, String> partitionToReset : 
shardResetMap.entrySet()) {
+          log.warn("Starting sequence number [%s] is no longer available for 
partition [%s]",
+                   partitionToReset.getValue(),
+                   partitionToReset.getKey().getPartitionId()
+          );
+        }
+        if (task.getTuningConfig().isResetOffsetAutomatically()) {
+          log.info("Attempting to reset offsets for [%d] partitions.", 
shardResetMap.size());

Review Comment:
   Partitions that repeatedly fall off the stream and get reset automatically 
would indicate an underlying issue, so I think logging the partition keys 
`shardResetMap.keys()` would be useful



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
   {
     if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
       final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+      final Map<StreamPartition<String>, String> shardResetMap = new 
HashMap<>();
       for (final StreamPartition<String> streamPartition : assignment) {
         String sequence = currOffsets.get(streamPartition.getPartitionId());
         if (!recordSupplier.isOffsetAvailable(streamPartition, 
KinesisSequenceNumber.of(sequence))) {
-          if (task.getTuningConfig().isResetOffsetAutomatically()) {
-            log.info("Attempting to reset sequences automatically for all 
partitions");
-            try {
-              sendResetRequestAndWait(
-                  assignment.stream()
-                            .collect(Collectors.toMap(x -> x, x -> 
currOffsets.get(x.getPartitionId()))),
-                  toolbox
-              );
-            }
-            catch (IOException e) {
-              throw new ISE(e, "Exception while attempting to automatically 
reset sequences");
-            }
-          } else {
-            throw new ISE(
-                "Starting sequenceNumber [%s] is no longer available for 
partition [%s] and resetOffsetAutomatically is not enabled",
-                sequence,
-                streamPartition.getPartitionId()
-            );
+          shardResetMap.put(streamPartition, sequence);
+        }
+      }
+
+      if (!shardResetMap.isEmpty()) {
+        for (Map.Entry<StreamPartition<String>, String> partitionToReset : 
shardResetMap.entrySet()) {
+          log.warn("Starting sequence number [%s] is no longer available for 
partition [%s]",

Review Comment:
   ```suggestion
             log.warn("Starting sequenceNumber[%s] is no longer available for 
partition[%s]",
   ```



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
   {
     if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
       final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+      final Map<StreamPartition<String>, String> shardResetMap = new 
HashMap<>();
       for (final StreamPartition<String> streamPartition : assignment) {
         String sequence = currOffsets.get(streamPartition.getPartitionId());
         if (!recordSupplier.isOffsetAvailable(streamPartition, 
KinesisSequenceNumber.of(sequence))) {
-          if (task.getTuningConfig().isResetOffsetAutomatically()) {
-            log.info("Attempting to reset sequences automatically for all 
partitions");
-            try {
-              sendResetRequestAndWait(
-                  assignment.stream()
-                            .collect(Collectors.toMap(x -> x, x -> 
currOffsets.get(x.getPartitionId()))),
-                  toolbox
-              );
-            }
-            catch (IOException e) {
-              throw new ISE(e, "Exception while attempting to automatically 
reset sequences");
-            }
-          } else {
-            throw new ISE(
-                "Starting sequenceNumber [%s] is no longer available for 
partition [%s] and resetOffsetAutomatically is not enabled",
-                sequence,
-                streamPartition.getPartitionId()
-            );
+          shardResetMap.put(streamPartition, sequence);
+        }
+      }
+
+      if (!shardResetMap.isEmpty()) {
+        for (Map.Entry<StreamPartition<String>, String> partitionToReset : 
shardResetMap.entrySet()) {
+          log.warn("Starting sequence number [%s] is no longer available for 
partition [%s]",
+                   partitionToReset.getValue(),
+                   partitionToReset.getKey().getPartitionId()
+          );
+        }
+        if (task.getTuningConfig().isResetOffsetAutomatically()) {
+          log.info("Attempting to reset offsets for [%d] partitions.", 
shardResetMap.size());
+          try {
+            sendResetRequestAndWait(shardResetMap, toolbox);
+          }
+          catch (IOException e) {
+            throw new ISE(e, "Exception while attempting to automatically 
reset sequences");
           }
+        } else {
+          throw new ISE("Sequence numbers are unavailable but automatic offset 
reset is disabled.");

Review Comment:
   Collect the unavailable partitions above and include them in the exception 
(instead of an end user having to dig up logs)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to