This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 00f0721662 Fix false data loss alerts in case of read_committed Kafka
isolation (#14716)
00f0721662 is described below
commit 00f0721662c2e9a3c4a1702bf13dbd937488295a
Author: Kartik Khare <[email protected]>
AuthorDate: Thu Dec 26 12:37:12 2024 +0530
Fix false data loss alerts in case of read_committed Kafka isolation
(#14716)
* Fix false data loss alerts in case of read_committed Kafka isolation
* Fix null handling
---------
Co-authored-by: Kartik Khare <[email protected]>
---
.../plugin/stream/kafka20/KafkaPartitionLevelConsumer.java | 11 ++++++++++-
.../plugin/stream/kafka30/KafkaPartitionLevelConsumer.java | 11 ++++++++++-
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index c1d4873abf..251b378ab9 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.BytesStreamMessage;
@@ -88,8 +89,16 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
}
}
+ // In case read_committed is enabled, the messages consumed are not
guaranteed to have consecutive offsets.
+ // TODO: A better solution would be to fetch earliest offset from topic
and see if it is greater than startOffset.
+ // However, this would require and additional call to Kafka which we want
to avoid.
+ boolean hasDataLoss = false;
+ if (_config.getKafkaIsolationLevel() == null ||
_config.getKafkaIsolationLevel()
+
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED))
{
+ hasDataLoss = firstOffset > startOffset;
+ }
return new KafkaMessageBatch(filteredRecords, records.size(),
offsetOfNextBatch, firstOffset, lastMessageMetadata,
- firstOffset > startOffset);
+ hasDataLoss);
}
private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String,
Bytes> record) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
index 0003204067..2e0e910f7c 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
+import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamMessageMetadata;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.BytesStreamMessage;
@@ -88,8 +89,16 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
}
}
+ // In case read_committed is enabled, the messages consumed are not
guaranteed to have consecutive offsets.
+ // TODO: A better solution would be to fetch earliest offset from topic
and see if it is greater than startOffset.
+ // However, this would require and additional call to Kafka which we want
to avoid.
+ boolean hasDataLoss = false;
+ if (_config.getKafkaIsolationLevel() == null ||
_config.getKafkaIsolationLevel()
+
.equals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED))
{
+ hasDataLoss = firstOffset > startOffset;
+ }
return new KafkaMessageBatch(filteredRecords, records.size(),
offsetOfNextBatch, firstOffset, lastMessageMetadata,
- firstOffset > startOffset);
+ hasDataLoss);
}
private StreamMessageMetadata extractMessageMetadata(ConsumerRecord<String,
Bytes> record) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]