abhishekrb19 commented on code in PR #15338:
URL: https://github.com/apache/druid/pull/15338#discussion_r1399648539
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
+ partitionToReset.getValue(),
+ partitionToReset.getKey().getPartitionId()
+ );
+ }
+ if (task.getTuningConfig().isResetOffsetAutomatically()) {
+ log.info("Attempting to reset offsets for [%d] partitions.",
shardResetMap.size());
+ try {
+ sendResetRequestAndWait(shardResetMap, toolbox);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Exception while attempting to automatically
reset sequences");
Review Comment:
Same, perhaps include `shardResetMap` keys in the exception?
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
+ partitionToReset.getValue(),
+ partitionToReset.getKey().getPartitionId()
+ );
+ }
+ if (task.getTuningConfig().isResetOffsetAutomatically()) {
+ log.info("Attempting to reset offsets for [%d] partitions.",
shardResetMap.size());
Review Comment:
Partitions that repeatedly fall off the stream and get reset automatically
would indicate an underlying issue, so I think logging the partition keys
`shardResetMap.keys()` would be useful
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
Review Comment:
```suggestion
log.warn("Starting sequenceNumber[%s] is no longer available for
partition[%s]",
```
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java:
##########
@@ -125,28 +125,31 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
+ final Map<StreamPartition<String>, String> shardResetMap = new
HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition,
KinesisSequenceNumber.of(sequence))) {
- if (task.getTuningConfig().isResetOffsetAutomatically()) {
- log.info("Attempting to reset sequences automatically for all
partitions");
- try {
- sendResetRequestAndWait(
- assignment.stream()
- .collect(Collectors.toMap(x -> x, x ->
currOffsets.get(x.getPartitionId()))),
- toolbox
- );
- }
- catch (IOException e) {
- throw new ISE(e, "Exception while attempting to automatically
reset sequences");
- }
- } else {
- throw new ISE(
- "Starting sequenceNumber [%s] is no longer available for
partition [%s] and resetOffsetAutomatically is not enabled",
- sequence,
- streamPartition.getPartitionId()
- );
+ shardResetMap.put(streamPartition, sequence);
+ }
+ }
+
+ if (!shardResetMap.isEmpty()) {
+ for (Map.Entry<StreamPartition<String>, String> partitionToReset :
shardResetMap.entrySet()) {
+ log.warn("Starting sequence number [%s] is no longer available for
partition [%s]",
+ partitionToReset.getValue(),
+ partitionToReset.getKey().getPartitionId()
+ );
+ }
+ if (task.getTuningConfig().isResetOffsetAutomatically()) {
+ log.info("Attempting to reset offsets for [%d] partitions.",
shardResetMap.size());
+ try {
+ sendResetRequestAndWait(shardResetMap, toolbox);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Exception while attempting to automatically
reset sequences");
}
+ } else {
+ throw new ISE("Sequence numbers are unavailable but automatic offset
reset is disabled.");
Review Comment:
Collect the unavailable partitions above and include them in the exception
(instead of an end user having to dig up logs)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]