suraj-goel commented on code in PR #18525:
URL: https://github.com/apache/druid/pull/18525#discussion_r2448152732
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java:
##########
@@ -172,16 +198,35 @@ public Set<StreamPartition<KafkaTopicPartition>>
getAssignment()
public List<OrderedPartitionableRecord<KafkaTopicPartition, Long,
KafkaRecordEntity>> poll(long timeout)
{
List<OrderedPartitionableRecord<KafkaTopicPartition, Long,
KafkaRecordEntity>> polledRecords = new ArrayList<>();
+
for (ConsumerRecord<byte[], byte[]> record :
consumer.poll(Duration.ofMillis(timeout))) {
+ KafkaTopicPartition kafkaPartition = new KafkaTopicPartition(multiTopic,
record.topic(), record.partition());
+
+ // Apply header filter if configured
+ if (headerFilterEvaluator != null &&
!headerFilterEvaluator.shouldIncludeRecord(record)) {
+ // Create filtered record for offset advancement with filtered=true
flag
+ polledRecords.add(new OrderedPartitionableRecord<>(
+ record.topic(),
+ kafkaPartition,
+ record.offset(),
+ Collections.emptyList(), // Empty list for filtered records
+ record.timestamp(),
+ true // Mark as filtered
+ ));
+ continue;
Review Comment:
We are also passing empty kafka record in case of shouldInclude is false.
imo, current implementation looks cleaner. Let me know if otherwise.
Regarding the inclusion and filtering in naming. Please refer to
https://github.com/apache/druid/pull/18525#discussion_r2448146529
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]