AmatyaAvadhanula commented on code in PR #15338:
URL: https://github.com/apache/druid/pull/15338#discussion_r1423403298
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
Review Comment:
Done
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
+ partitionToReset.getValue(),
+ partitionToReset.getKey().getPartitionId()
+ );
+ }
+ if (task.getTuningConfig().isResetOffsetAutomatically()) {
+ log.info("Attempting to reset offsets for [%d] partitions.",
shardResetMap.size());
Review Comment:
Done
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
+ partitionToReset.getValue(),
+ partitionToReset.getKey().getPartitionId()
+ );
+ }
+ if (task.getTuningConfig().isResetOffsetAutomatically()) {
+ log.info("Attempting to reset offsets for [%d] partitions.",
shardResetMap.size());
+ try {
+ sendResetRequestAndWait(shardResetMap, toolbox);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Exception while attempting to automatically
reset sequences");
Review Comment:
Done
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
+ partitionToReset.getValue(),
+ partitionToReset.getKey().getPartitionId()
+ );
+ }
+ if (task.getTuningConfig().isResetOffsetAutomatically()) {
+ log.info("Attempting to reset offsets for [%d] partitions.",
shardResetMap.size());
+ try {
+ sendResetRequestAndWait(shardResetMap, toolbox);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Exception while attempting to automatically
reset sequences");
}
+ } else {
+ throw new ISE("Sequence numbers are unavailable but automatic offset
reset is disabled.");
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]