FrankChen021 commented on code in PR #19311: URL: https://github.com/apache/druid/pull/19311#discussion_r3241286739
########## extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplier.java: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.indexing.seekablestream.common.AcknowledgeType; +import org.apache.druid.indexing.seekablestream.common.AcknowledgingRecordSupplier; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import javax.annotation.Nonnull; +import javax.validation.constraints.NotNull; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Adapts {@link KafkaShareConsumer} to {@link AcknowledgingRecordSupplier}. + * Delivery state lives on the broker; the supplier sets {@code group.id} to + * the configured share group name. + */ +public class KafkaShareGroupRecordSupplier + implements AcknowledgingRecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> +{ + private static final Logger log = new Logger(KafkaShareGroupRecordSupplier.class); + + private final KafkaShareConsumer<byte[], byte[]> consumer; + + /** + * Records returned by the most recent {@link #poll(long)}, retained so + * {@link #acknowledge} can pass the original {@link ConsumerRecord} to the + * Kafka client. The {@code (topic, partition, offset)} ack variant is fragile + * once the share-fetch buffer rolls over (KIP-932). + */ + private final Map<RecordKey, ConsumerRecord<byte[], byte[]>> deliveredRecords = new HashMap<>(); + private boolean closed; + + public KafkaShareGroupRecordSupplier( + Map<String, Object> consumerProperties, + ObjectMapper sortingMapper, + String groupId + ) + { + this(createShareConsumer(consumerProperties, sortingMapper, groupId)); + } + + @VisibleForTesting + public KafkaShareGroupRecordSupplier(KafkaShareConsumer<byte[], byte[]> consumer) + { + this.consumer = consumer; + } + + @Override + public void subscribe(Set<String> topics) + { + consumer.subscribe(topics); + } + + @Override + public void unsubscribe() + { + consumer.unsubscribe(); + } + + @Override + public Set<String> subscription() + { + return consumer.subscription(); + } + + @NotNull + @Override + public List<OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> poll(long timeoutMs) + { + deliveredRecords.clear(); + final List<OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> polledRecords = + new ArrayList<>(); + for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeoutMs))) { + deliveredRecords.put(new RecordKey(record.topic(), record.partition(), record.offset()), record); + polledRecords.add(new OrderedPartitionableRecord<>( + record.topic(), + new KafkaTopicPartition(true, record.topic(), record.partition()), + record.offset(), + record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record)), + record.timestamp() + )); + } + return polledRecords; + } + + @Override + public void acknowledge(KafkaTopicPartition partitionId, Long offset) + { + acknowledge(partitionId, offset, AcknowledgeType.ACCEPT); + } + + @Override + public void acknowledge(KafkaTopicPartition partitionId, Long offset, AcknowledgeType type) + { + final String topic = partitionId.topic().orElseThrow( + () -> new IllegalArgumentException("Cannot acknowledge record without topic") + ); + final ConsumerRecord<byte[], byte[]> record = deliveredRecords.get( + new RecordKey(topic, partitionId.partition(), offset) + ); + if (record == null) { + throw new IllegalStateException(StringUtils.format( + "Cannot acknowledge unknown record at topic[%s] partition[%d] offset[%d]; " + + "either it was not delivered by the most recent poll() or it has already been acknowledged.", + topic, partitionId.partition(), offset + )); + } + consumer.acknowledge(record, toKafkaAcknowledgeType(type)); + } + + @Override + public void acknowledge( + Map<KafkaTopicPartition, Collection<Long>> offsets, + AcknowledgeType type + ) + { + final org.apache.kafka.clients.consumer.AcknowledgeType kafkaType = toKafkaAcknowledgeType(type); + for (Map.Entry<KafkaTopicPartition, Collection<Long>> entry : offsets.entrySet()) { + final KafkaTopicPartition partition = entry.getKey(); + final String topic = partition.topic().orElseThrow( + () -> new IllegalArgumentException("Cannot acknowledge record without topic") + ); + for (Long offset : entry.getValue()) { + final ConsumerRecord<byte[], byte[]> record = deliveredRecords.get( + new RecordKey(topic, partition.partition(), offset) + ); + if (record == null) { + throw new IllegalStateException(StringUtils.format( + "Cannot acknowledge unknown record at topic[%s] partition[%d] offset[%d]; " + + "either it was not delivered by the most recent poll() or it has already been acknowledged.", + topic, partition.partition(), offset + )); + } + consumer.acknowledge(record, kafkaType); + } + } + } + + @Override + public Map<KafkaTopicPartition, Optional<Exception>> commitSync() + { + final Map<TopicIdPartition, Optional<KafkaException>> result = consumer.commitSync(); + final Map<KafkaTopicPartition, Optional<Exception>> mapped = new HashMap<>(); + for (Map.Entry<TopicIdPartition, Optional<KafkaException>> entry : result.entrySet()) { + final TopicIdPartition tip = entry.getKey(); + mapped.put( + new KafkaTopicPartition(true, tip.topic(), tip.partition()), + entry.getValue().map(e -> (Exception) e) + ); + } + return mapped; + } + + @Override + public Set<KafkaTopicPartition> getPartitionIds(String stream) + { + // Share consumer does not expose partition assignment; broker manages it. + return Collections.emptySet(); + } + + @Override + public void wakeup() + { + consumer.wakeup(); + } + + @Override + public Optional<Integer> acquisitionLockTimeoutMs() + { + return consumer.acquisitionLockTimeoutMs(); + } + + @Override + public void close() + { + if (closed) { + return; + } + closed = true; + try { + consumer.close(); + } + catch (Exception e) { + log.warn(e, "Exception closing KafkaShareConsumer"); + } + } + + @Nonnull + private static org.apache.kafka.clients.consumer.AcknowledgeType toKafkaAcknowledgeType(AcknowledgeType type) + { + switch (type) { + case ACCEPT: + return org.apache.kafka.clients.consumer.AcknowledgeType.ACCEPT; + case RELEASE: + return org.apache.kafka.clients.consumer.AcknowledgeType.RELEASE; + case REJECT: + return org.apache.kafka.clients.consumer.AcknowledgeType.REJECT; + case RENEW: + return org.apache.kafka.clients.consumer.AcknowledgeType.RENEW; + default: + throw new IllegalArgumentException("Unknown acknowledge type: " + type); + } + } + + private static KafkaShareConsumer<byte[], byte[]> createShareConsumer( + Map<String, Object> consumerProperties, + ObjectMapper sortingMapper, + String groupId + ) + { + final Map<String, Object> sanitized = ShareGroupConsumerProperties.sanitize(consumerProperties); Review Comment: [P2] Unsupported share-consumer configs can still enter through dynamic config sanitize() runs before KafkaRecordSupplier.addConsumerPropertiesFromConfig, but that helper expands druid.dynamic.config.provider afterward and copies all returned entries directly into the Properties. If a dynamic provider supplies a forbidden key such as enable.auto.commit, auto.offset.reset, or group.instance.id, KafkaShareConsumer still receives it and the task can fail at startup despite the documented stripping behavior. Sanitize after dynamic expansion as well, or expand into a map first and then strip unsupported keys once. ########## docs/ingestion/kafka-share-group-ingestion.md: ########## @@ -0,0 +1,337 @@ +--- +id: kafka-share-group-ingestion +title: "Kafka share group ingestion" +sidebar_label: "Kafka share group ingestion" +description: "Queue-semantics ingestion from Apache Kafka using share groups (KIP-932). Scale consumers beyond partition count with at-least-once delivery." +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +:::info +Requires Apache Kafka 4.0 or higher with share groups (KIP-932) enabled on the broker. +::: + +## Overview + +Kafka share groups (KIP-932) let multiple consumers read from the same partition concurrently. The broker manages per-record acquisition locks and explicit acknowledgement, so consumer count is not capped by partition count, joining or leaving consumers does not pause the group, and a slow record does not block its partition. + +Druid's `ShareGroupIndexTask` consumes from a share group and publishes segments with at-least-once delivery: records are acknowledged only after their segments are atomically registered in the metadata store. + +## When to use share group ingestion + +| Scenario | Consumer group | Share group | +|----------|---------------|-------------| +| Workers needed exceed partition count | Idle workers | All workers active | +| Elastic scaling (auto-scale events) | Rebalancing pause (30-60s) | Zero pause | +| Per-message processing time varies | Head-of-line blocking | Independent processing | +| Ordered processing required per partition | Yes | No (delivery order not guaranteed) | + +Choose share groups when throughput and elastic scaling matter more than strict per-partition ordering. + +## Task spec + +Submit a `ShareGroupIndexTask` to the Overlord. There are no start/end offsets -- the broker tracks them. + +```json +{ + "type": "index_kafka_share_group", + "dataSchema": { + "dataSource": "my_datasource", + "timestampSpec": { + "column": "__time", + "format": "auto" + }, + "dimensionsSpec": { + "useSchemaDiscovery": true + }, + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "NONE" + } + }, + "ioConfig": { + "topic": "my_topic", + "groupId": "druid-share-group", + "consumerProperties": { + "bootstrap.servers": "kafka-broker:9092" + }, + "inputFormat": { + "type": "json" + }, + "pollTimeout": 2000 + }, + "tuningConfig": { + "type": "KafkaTuningConfig", + "maxRowsPerSegment": 5000000 + } +} +``` + +## IO configuration + +| Property | Type | Required | Default | Description | +|----------|------|----------|---------|-------------| +| `topic` | String | Yes | -- | Kafka topic to consume from. | +| `groupId` | String | Yes | -- | Share group identifier. Multiple tasks with the same `groupId` share the workload. | +| `consumerProperties` | Map | Yes | -- | Kafka consumer properties. Must include `bootstrap.servers`. See [Consumer property restrictions](#consumer-property-restrictions). | +| `inputFormat` | Object | Yes | -- | Input format for parsing records (json, csv, avro, etc.). | +| `pollTimeout` | Long | No | 2000 | Poll timeout in milliseconds. | + +### Consumer property restrictions + +Share consumers (KIP-932) reject some keys that are valid for regular consumer groups. Druid silently strips the keys below from `consumerProperties` (with a `WARN` log per stripped key) before constructing the `KafkaShareConsumer`: + +| Stripped key | Why | +|--------------|-----| +| `auto.offset.reset` | Initial position is broker-controlled for share groups. | +| `enable.auto.commit` | Share consumers always require explicit `acknowledge()` + `commitSync()`. | +| `group.instance.id` | Share groups do not support static membership. | +| `isolation.level` | Always read-committed for share groups. | +| `partition.assignment.strategy` | Broker controls per-record delivery for share groups. | +| `interceptor.classes` | Not supported for share consumers. | +| `session.timeout.ms` | Share groups have no consumer-group session model. | +| `heartbeat.interval.ms` | Share groups have no heartbeat. | +| `group.protocol` | Always `SHARE` for share consumers. | +| `group.remote.assignor` | Not applicable to share groups. | + +`share.acknowledgement.mode=explicit` is set automatically and must not be overridden. + +### Tuning configuration + +`tuningConfig` accepts the standard `KafkaTuningConfig` fields. The runner currently honors: + +- `maxRowsInMemory` / `maxBytesInMemory`: triggers a mid-batch persist when the appenderator signals `isPersistRequired`. +- `maxRowsPerSegment`: when reached during a batch the runner logs the event; over-threshold segments are pushed at the end-of-batch publish boundary. + +Mid-batch checkpoint and sequence rollover are not supported. + +## How it works + +1. The task subscribes to the topic with a `KafkaShareConsumer` using the configured `groupId`. +2. The broker delivers batches of records with per-record acquisition locks. +3. Each polled record is parsed by `StreamChunkReader` (the same multi-row parser as `KafkaIndexTask`); a record may produce zero, one, or many `InputRow`s. All resulting rows are added to the appenderator before the record is acknowledged. +4. Parse failures go through `ParseExceptionHandler` (so `maxParseExceptions` is honored). Bytes/processed/unparseable counters are incremented exactly once per row. +5. Segments persist mid-batch on memory pressure and unconditionally at end-of-batch, then publish atomically via `SegmentTransactionalAppendAction`. +6. After a successful publish, every offset in the batch is acknowledged with `ACCEPT` and a `commitSync()` flushes acknowledgements to the broker. +7. On task failure or graceful stop before publish, unacknowledged records are redelivered by the broker after the acquisition lock expires. + +## Safety invariants + +1. **ACK after publish:** `ACCEPT` is sent only after the segment is registered in the metadata store. No data loss on task failure. +2. **Multi-row safe:** every row produced from a record is added to the appenderator before that record is acknowledged. +3. **Resource safe:** `Appenderator` and `KafkaShareConsumer` are released on every exit path. +4. **Terminal state:** every polled record reaches exactly one terminal state -- `ACCEPT`, `RELEASE`, or broker redelivery after lock expiry. + +## Graceful stop + +When the Overlord asks a task to stop, the runner calls `KafkaShareConsumer.wakeup()`. The in-flight `poll()` throws `WakeupException`; the runner exits the loop after committing any in-flight batch. Records polled but not yet published remain unacknowledged and are redelivered by the broker after the acquisition lock expires. + +## Acquisition lock duration + +The broker controls the lock via `group.share.record.lock.duration.ms`. The runner logs the effective value once after the first poll: + +``` +Effective broker acquisition lock timeout for share-group[my-group]: 30000 ms +``` + +A single thread does both poll and publish. If a batch exceeds the lock duration, in-flight records may be redelivered (duplicates). Tune `pollTimeout`, `maxRowsInMemory`, and `maxRowsPerSegment` so each cycle stays well under the lock window. + +## Scaling + +Tasks with the same `groupId` share the workload automatically; you can run more tasks than partitions: + +``` +Topic: 4 partitions +Tasks with same groupId: 20 +Result: All 20 tasks actively consuming (broker distributes records) +``` + +Adding or removing tasks does not trigger a rebalancing pause. + +## Delivery semantics + +At-least-once. On task failure, records between the last committed acknowledgement and the failure point are redelivered, which may produce duplicates across restarts. A deduplication cache is planned. + +## Metrics + +In addition to the standard ingestion metrics (`ingest/events/processed`, `ingest/events/unparseable`, `ingest/persists/count`, etc.), share-group ingestion emits: + +| Metric | Description | +|--------|-------------| +| `ingest/shareGroup/commitFailures` | Per-batch count of partitions whose `commitSync()` failed. A non-zero value means the affected records will be redelivered; alert on sustained non-zero values. | + +## Limitations (current release) + +- Single-threaded ingestion per task; a future enhancement may add a background `RENEW` thread to extend the broker lock for long-running batches. +- No supervisor integration; tasks are submitted manually via the Overlord API. A `KafkaShareGroupSupervisor` is planned as a future enhancement. +- No deduplication cache (at-least-once). +- Delivery order within a partition is not guaranteed. +- Mid-batch checkpoint / sequence rollover is not supported. If a batch grossly exceeds `maxRowsPerSegment` the runner still publishes correctly (multiple segments per batch), but the threshold is only checked at end-of-batch boundaries. + +## Demo: end-to-end validation with Druid UI + +### Prerequisites + +- Java 17 +- Kafka 4.2.0 (with share groups enabled) +- Druid checked out from this repository (built from source) + +### Step 1: Start Kafka with share groups + +```bash +cd kafka_2.13-4.2.0 + +KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" +bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties + +echo "group.share.enable=true" >> config/server.properties +echo "group.share.record.lock.duration.ms=30000" >> config/server.properties + +bin/kafka-server-start.sh config/server.properties +``` + +### Step 2: Create topic and produce messages + +```bash +cd kafka_2.13-4.2.0 + +bin/kafka-topics.sh --create --topic druid-share-test --partitions 4 --bootstrap-server localhost:9092 + +bin/kafka-console-producer.sh --topic druid-share-test --bootstrap-server localhost:9092 Review Comment: [P2] Demo produces records before the share group can read them The demo creates and produces the sample messages before submitting druid-demo-share-group, but share-group starting position is broker-controlled and this PR strips auto.offset.reset; the tests explicitly set share.auto.offset.reset=earliest for this reason. With the broker default of latest, users following the demo will submit the task after the sample records already exist and ingest zero rows. Move producing after task startup or add a kafka-configs/AdminClient step that sets share.auto.offset.reset=earliest for the demo group before producing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
