This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f0721662 Fix false data loss alerts in case of read_committed Kafka 
isolation (#14716)
00f0721662 is described below

commit 00f0721662c2e9a3c4a1702bf13dbd937488295a
Author: Kartik Khare <[email protected]>
AuthorDate: Thu Dec 26 12:37:12 2024 +0530

    Fix false data loss alerts in case of read_committed Kafka isolation 
(#14716)
    
    * Fix false data loss alerts in case of read_committed Kafka isolation
    
    * Fix null handling
    
    ---------
    
    Co-authored-by: Kartik Khare <[email protected]>
---
 .../plugin/stream/kafka20/KafkaPartitionLevelConsumer.java    | 11 ++++++++++-
 .../plugin/stream/kafka30/KafkaPartitionLevelConsumer.java    | 11 ++++++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index c1d4873abf..251b378ab9 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.BytesStreamMessage;
@@ -88,8 +89,16 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
       }
     }
 
+    // In case read_committed is enabled, the messages consumed are not 
guaranteed to have consecutive offsets.
+    // TODO: A better solution would be to fetch earliest offset from topic 
and see if it is greater than startOffset.
+    // However, this would require and additional call to Kafka which we want 
to avoid.
+    boolean hasDataLoss = false;
+    if (_config.getKafkaIsolationLevel() == null || 
_config.getKafkaIsolationLevel()
+        
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED))
 {
+      hasDataLoss = firstOffset > startOffset;
+    }
     return new KafkaMessageBatch(filteredRecords, records.size(), 
offsetOfNextBatch, firstOffset, lastMessageMetadata,
-        firstOffset > startOffset);
+        hasDataLoss);
   }
 
   private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, 
Bytes> record) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
index 0003204067..2e0e910f7c 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.BytesStreamMessage;
@@ -88,8 +89,16 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
       }
     }
 
+    // In case read_committed is enabled, the messages consumed are not 
guaranteed to have consecutive offsets.
+    // TODO: A better solution would be to fetch earliest offset from topic 
and see if it is greater than startOffset.
+    // However, this would require and additional call to Kafka which we want 
to avoid.
+    boolean hasDataLoss = false;
+    if (_config.getKafkaIsolationLevel() == null || 
_config.getKafkaIsolationLevel()
+        
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED))
 {
+      hasDataLoss = firstOffset > startOffset;
+    }
     return new KafkaMessageBatch(filteredRecords, records.size(), 
offsetOfNextBatch, firstOffset, lastMessageMetadata,
-        firstOffset > startOffset);
+        hasDataLoss);
   }
 
   private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String, 
Bytes> record) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to