samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r759879813



##########
File path: docs/development/extensions-core/kafka-supervisor-reference.md
##########
@@ -189,7 +189,7 @@ The `tuningConfig` is optional and default parameters will 
be used if no `tuning
 | `indexSpecForIntermediatePersists`|                | Defines segment storage 
format options to be used at indexing time for intermediate persisted temporary 
segments. This can be used to disable dimension/metric compression on 
intermediate segments to reduce memory required for final merging. However, 
disabling compression on intermediate segments might increase page cache use 
while they are used before getting merged into final segment published, see 
[IndexSpec](#indexspec) for possible values.                                    
                                                                                
                                                                 | no (default 
= same as `indexSpec`)                                                          
                   |
 | `reportParseExceptions`           | Boolean        | *DEPRECATED*. If true, 
exceptions encountered during parsing will be thrown and will halt ingestion; 
if false, unparseable rows and fields will be skipped. Setting 
`reportParseExceptions` to true will override existing configurations for 
`maxParseExceptions` and `maxSavedParseExceptions`, setting 
`maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more 
than 1.                                                                         
                                                                                
                                                                                
              | no (default == false)                                           
                                             |
 | `handoffConditionTimeout`         | Long           | Milliseconds to wait 
for segment handoff. It must be >= 0, where 0 means to wait forever.            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               | no (default == 0)              
                                                                              |
-| `resetOffsetAutomatically`        | Boolean        | Controls behavior when 
Druid needs to read Kafka messages that are no longer available (i.e. when 
`OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception 
will bubble up, which will cause your tasks to fail and ingestion to halt. If 
this occurs, manual intervention is required to correct the situation; 
potentially using the [Reset Supervisor 
API](../../operations/api-reference.md#supervisors). This mode is useful for 
production, since it will make you aware of issues with ingestion.<br/><br/>If 
true, Druid will automatically reset to the earlier or latest offset available 
in Kafka, based on the value of the `useEarliestOffset` property (earliest if 
true, latest if false). Note that this can lead to data being _DROPPED_ (if 
`useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) 
without your knowledge. Messages will be logged indicating that a reset has 
occurred, but ingestion will cont
 inue. This mode is useful for non-production situations, since it will make 
Druid attempt to recover from problems automatically, even if they lead to 
quiet dropping or duplicating of data.<br/><br/>This feature behaves similarly 
to the Kafka `auto.offset.reset` consumer property. | no (default == false) |
+| `resetOffsetAutomatically`        | Boolean        | Controls behavior when 
Druid needs to read Kafka messages that are no longer available (i.e. when 
`OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception 
will bubble up, which will cause your tasks to fail and ingestion to halt. If 
this occurs, manual intervention is required to correct the situation; 
potentially using the [Reset Supervisor 
API](../../operations/api-reference.md#supervisors). This mode is useful for 
production, since it will make you aware of issues with ingestion.<br/><br/>If 
true, Druid will automatically reset to the least offset available in Kafka. 
Note that this can lead to data being _DROPPED_ without your knowledge. 
Messages will be logged indicating that a reset has occurred, but ingestion 
will continue. This mode is useful for non-production situations, since it will 
make Druid attempt to recover from problems automatically, even if they lead to 
quiet dropping.<br/><br/>This featur
 e behaves similarly to the Kafka `auto.offset.reset` consumer property. | no 
(default == false) |

Review comment:
       This matches the behavior I have observed. I don't see where 
`useEarliestOffset` property is actually being used. 

##########
File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : 
outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();

Review comment:
       Would a better name be `outOfRangeOffset`? 

##########
File path: docs/development/extensions-core/kafka-supervisor-reference.md
##########
@@ -189,7 +189,7 @@ The `tuningConfig` is optional and default parameters will 
be used if no `tuning
 | `indexSpecForIntermediatePersists`|                | Defines segment storage 
format options to be used at indexing time for intermediate persisted temporary 
segments. This can be used to disable dimension/metric compression on 
intermediate segments to reduce memory required for final merging. However, 
disabling compression on intermediate segments might increase page cache use 
while they are used before getting merged into final segment published, see 
[IndexSpec](#indexspec) for possible values.                                    
                                                                                
                                                                 | no (default 
= same as `indexSpec`)                                                          
                   |
 | `reportParseExceptions`           | Boolean        | *DEPRECATED*. If true, 
exceptions encountered during parsing will be thrown and will halt ingestion; 
if false, unparseable rows and fields will be skipped. Setting 
`reportParseExceptions` to true will override existing configurations for 
`maxParseExceptions` and `maxSavedParseExceptions`, setting 
`maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more 
than 1.                                                                         
                                                                                
                                                                                
              | no (default == false)                                           
                                             |
 | `handoffConditionTimeout`         | Long           | Milliseconds to wait 
for segment handoff. It must be >= 0, where 0 means to wait forever.            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               | no (default == 0)              
                                                                              |
-| `resetOffsetAutomatically`        | Boolean        | Controls behavior when 
Druid needs to read Kafka messages that are no longer available (i.e. when 
`OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception 
will bubble up, which will cause your tasks to fail and ingestion to halt. If 
this occurs, manual intervention is required to correct the situation; 
potentially using the [Reset Supervisor 
API](../../operations/api-reference.md#supervisors). This mode is useful for 
production, since it will make you aware of issues with ingestion.<br/><br/>If 
true, Druid will automatically reset to the earlier or latest offset available 
in Kafka, based on the value of the `useEarliestOffset` property (earliest if 
true, latest if false). Note that this can lead to data being _DROPPED_ (if 
`useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) 
without your knowledge. Messages will be logged indicating that a reset has 
occurred, but ingestion will cont
 inue. This mode is useful for non-production situations, since it will make 
Druid attempt to recover from problems automatically, even if they lead to 
quiet dropping or duplicating of data.<br/><br/>This feature behaves similarly 
to the Kafka `auto.offset.reset` consumer property. | no (default == false) |
+| `resetOffsetAutomatically`        | Boolean        | Controls behavior when 
Druid needs to read Kafka messages that are no longer available (i.e. when 
`OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception 
will bubble up, which will cause your tasks to fail and ingestion to halt. If 
this occurs, manual intervention is required to correct the situation; 
potentially using the [Reset Supervisor 
API](../../operations/api-reference.md#supervisors). This mode is useful for 
production, since it will make you aware of issues with ingestion.<br/><br/>If 
true, Druid will automatically reset to the least offset available in Kafka. 
Note that this can lead to data being _DROPPED_ without your knowledge. 
Messages will be logged indicating that a reset has occurred, but ingestion 
will continue. This mode is useful for non-production situations, since it will 
make Druid attempt to recover from problems automatically, even if they lead to 
quiet dropping.<br/><br/>This featur
 e behaves similarly to the Kafka `auto.offset.reset` consumer property. | no 
(default == false) |

Review comment:
       Kafka terminology is EARLIEST or LATEST offset. We should probably still 
have it as `earliest offset available in Kafka`.

##########
File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : 
outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = 
recordSupplier.getEarliestSequenceNumber(streamPartition);
         if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from 
kafka!",
-              topicPartition.partition()
-          );
+          throw new ISE("got null earliest sequence number for partition[%s] 
when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than 
the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (nextFetchingOffset < leastAvailableOffset) {
+          // reset offset to the least available position since it's unable to 
read messages from nextFetchingOffset
+          recordSupplier.seek(streamPartition, leastAvailableOffset);
+

Review comment:
       nit: can probably get rid of the empty line here. 

##########
File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : 
outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = 
recordSupplier.getEarliestSequenceNumber(streamPartition);

Review comment:
       I would probably name it as `earliestAvailableOffset` to match Kafka 
terminology. But it's ok if you don't do it in this PR :). 

##########
File path: 
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : 
outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = 
recordSupplier.getEarliestSequenceNumber(streamPartition);
         if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from 
kafka!",
-              topicPartition.partition()
-          );
+          throw new ISE("got null earliest sequence number for partition[%s] 
when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);

Review comment:
       I am not 100% sure if we can get rid of this seek. The purpose of this 
seek here is reset the offset back to what it was before 
`getEarliestSequenceNumber` is called. 
   
   
   `recordSupplier.getEarliestSequenceNumber(streamPartition)` is doing the 
following:
   ```
   Long currPos = getPosition(partition);
   seekToEarliest(Collections.singleton(partition));
   Long nextPos = getPosition(partition);
   seek(partition, currPos);
   ```
   If it can be guaranteed that currPos `Long currPos = getPosition(partition)` 
is storing the same offset as `nextOffset`, then we can get rid of this seek. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to