This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 48a96f5d06b Better automatic offset reset for Kinesis ingestion
(#15338)
48a96f5d06b is described below
commit 48a96f5d06bf4b4d1532b0ec96eb631b4287e30d
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Dec 13 12:03:17 2023 +0530
Better automatic offset reset for Kinesis ingestion (#15338)
Better automatic offset reset for Kinesis ingestion
---
.../indexing/kinesis/KinesisIndexTaskRunner.java | 49 ++++++++++++++--------
1 file changed, 31 insertions(+), 18 deletions(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 22448ba67a0..75f23da0e1f 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -42,12 +42,12 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String, String, ByteEntity>
{
@@ -125,28 +125,42 @@ public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> partitionToSequenceResetMap =
new HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
+ partitionToSequenceResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!partitionToSequenceResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToSequence :
partitionToSequenceResetMap.entrySet()) {
+ log.warn("Starting sequenceNumber[%s] is no longer available for
partition[%s].",
+ partitionToSequence.getValue(),
+ partitionToSequence.getKey()
+ );
+ }
+ if (task.getTuningConfig().isResetOffsetAutomatically()) {
+ log.info(
+ "Attempting to reset offsets for [%d] partitions with ids[%s].",
+ partitionToSequenceResetMap.size(),
+ partitionToSequenceResetMap.keySet()
+ );
+ try {
+ sendResetRequestAndWait(partitionToSequenceResetMap, toolbox);
+ }
+ catch (IOException e) {
throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
+ e,
+ "Exception while attempting to automatically reset sequences
for partitions[%s]",
+ partitionToSequenceResetMap.keySet()
);
}
+ } else {
+ throw new ISE(
+ "Automatic offset reset is disabled, but there are partitions
with unavailable sequence numbers [%s].",
+ partitionToSequenceResetMap
+ );
}
}
}
@@ -191,5 +205,4 @@ public class KinesisIndexTaskRunner extends
SeekableStreamIndexTaskRunner<String
return null;
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]