FrankChen021 commented on code in PR #19311:
URL: https://github.com/apache/druid/pull/19311#discussion_r3241286739


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaShareGroupRecordSupplier.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
+import org.apache.druid.data.input.kafka.KafkaTopicPartition;
+import org.apache.druid.indexing.seekablestream.common.AcknowledgeType;
+import 
org.apache.druid.indexing.seekablestream.common.AcknowledgingRecordSupplier;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import javax.annotation.Nonnull;
+import javax.validation.constraints.NotNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Adapts {@link KafkaShareConsumer} to {@link AcknowledgingRecordSupplier}.
+ * Delivery state lives on the broker; the supplier sets {@code group.id} to
+ * the configured share group name.
+ */
+public class KafkaShareGroupRecordSupplier
+    implements AcknowledgingRecordSupplier<KafkaTopicPartition, Long, 
KafkaRecordEntity>
+{
+  private static final Logger log = new 
Logger(KafkaShareGroupRecordSupplier.class);
+
+  private final KafkaShareConsumer<byte[], byte[]> consumer;
+
+  /**
+   * Records returned by the most recent {@link #poll(long)}, retained so
+   * {@link #acknowledge} can pass the original {@link ConsumerRecord} to the
+   * Kafka client. The {@code (topic, partition, offset)} ack variant is 
fragile
+   * once the share-fetch buffer rolls over (KIP-932).
+   */
+  private final Map<RecordKey, ConsumerRecord<byte[], byte[]>> 
deliveredRecords = new HashMap<>();
+  private boolean closed;
+
+  public KafkaShareGroupRecordSupplier(
+      Map<String, Object> consumerProperties,
+      ObjectMapper sortingMapper,
+      String groupId
+  )
+  {
+    this(createShareConsumer(consumerProperties, sortingMapper, groupId));
+  }
+
+  @VisibleForTesting
+  public KafkaShareGroupRecordSupplier(KafkaShareConsumer<byte[], byte[]> 
consumer)
+  {
+    this.consumer = consumer;
+  }
+
+  @Override
+  public void subscribe(Set<String> topics)
+  {
+    consumer.subscribe(topics);
+  }
+
+  @Override
+  public void unsubscribe()
+  {
+    consumer.unsubscribe();
+  }
+
+  @Override
+  public Set<String> subscription()
+  {
+    return consumer.subscription();
+  }
+
+  @NotNull
+  @Override
+  public List<OrderedPartitionableRecord<KafkaTopicPartition, Long, 
KafkaRecordEntity>> poll(long timeoutMs)
+  {
+    deliveredRecords.clear();
+    final List<OrderedPartitionableRecord<KafkaTopicPartition, Long, 
KafkaRecordEntity>> polledRecords =
+        new ArrayList<>();
+    for (ConsumerRecord<byte[], byte[]> record : 
consumer.poll(Duration.ofMillis(timeoutMs))) {
+      deliveredRecords.put(new RecordKey(record.topic(), record.partition(), 
record.offset()), record);
+      polledRecords.add(new OrderedPartitionableRecord<>(
+          record.topic(),
+          new KafkaTopicPartition(true, record.topic(), record.partition()),
+          record.offset(),
+          record.value() == null ? null : ImmutableList.of(new 
KafkaRecordEntity(record)),
+          record.timestamp()
+      ));
+    }
+    return polledRecords;
+  }
+
+  @Override
+  public void acknowledge(KafkaTopicPartition partitionId, Long offset)
+  {
+    acknowledge(partitionId, offset, AcknowledgeType.ACCEPT);
+  }
+
+  @Override
+  public void acknowledge(KafkaTopicPartition partitionId, Long offset, 
AcknowledgeType type)
+  {
+    final String topic = partitionId.topic().orElseThrow(
+        () -> new IllegalArgumentException("Cannot acknowledge record without 
topic")
+    );
+    final ConsumerRecord<byte[], byte[]> record = deliveredRecords.get(
+        new RecordKey(topic, partitionId.partition(), offset)
+    );
+    if (record == null) {
+      throw new IllegalStateException(StringUtils.format(
+          "Cannot acknowledge unknown record at topic[%s] partition[%d] 
offset[%d]; "
+          + "either it was not delivered by the most recent poll() or it has 
already been acknowledged.",
+          topic, partitionId.partition(), offset
+      ));
+    }
+    consumer.acknowledge(record, toKafkaAcknowledgeType(type));
+  }
+
+  @Override
+  public void acknowledge(
+      Map<KafkaTopicPartition, Collection<Long>> offsets,
+      AcknowledgeType type
+  )
+  {
+    final org.apache.kafka.clients.consumer.AcknowledgeType kafkaType = 
toKafkaAcknowledgeType(type);
+    for (Map.Entry<KafkaTopicPartition, Collection<Long>> entry : 
offsets.entrySet()) {
+      final KafkaTopicPartition partition = entry.getKey();
+      final String topic = partition.topic().orElseThrow(
+          () -> new IllegalArgumentException("Cannot acknowledge record 
without topic")
+      );
+      for (Long offset : entry.getValue()) {
+        final ConsumerRecord<byte[], byte[]> record = deliveredRecords.get(
+            new RecordKey(topic, partition.partition(), offset)
+        );
+        if (record == null) {
+          throw new IllegalStateException(StringUtils.format(
+              "Cannot acknowledge unknown record at topic[%s] partition[%d] 
offset[%d]; "
+              + "either it was not delivered by the most recent poll() or it 
has already been acknowledged.",
+              topic, partition.partition(), offset
+          ));
+        }
+        consumer.acknowledge(record, kafkaType);
+      }
+    }
+  }
+
+  @Override
+  public Map<KafkaTopicPartition, Optional<Exception>> commitSync()
+  {
+    final Map<TopicIdPartition, Optional<KafkaException>> result = 
consumer.commitSync();
+    final Map<KafkaTopicPartition, Optional<Exception>> mapped = new 
HashMap<>();
+    for (Map.Entry<TopicIdPartition, Optional<KafkaException>> entry : 
result.entrySet()) {
+      final TopicIdPartition tip = entry.getKey();
+      mapped.put(
+          new KafkaTopicPartition(true, tip.topic(), tip.partition()),
+          entry.getValue().map(e -> (Exception) e)
+      );
+    }
+    return mapped;
+  }
+
+  @Override
+  public Set<KafkaTopicPartition> getPartitionIds(String stream)
+  {
+    // Share consumer does not expose partition assignment; broker manages it.
+    return Collections.emptySet();
+  }
+
+  @Override
+  public void wakeup()
+  {
+    consumer.wakeup();
+  }
+
+  @Override
+  public Optional<Integer> acquisitionLockTimeoutMs()
+  {
+    return consumer.acquisitionLockTimeoutMs();
+  }
+
+  @Override
+  public void close()
+  {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      consumer.close();
+    }
+    catch (Exception e) {
+      log.warn(e, "Exception closing KafkaShareConsumer");
+    }
+  }
+
+  @Nonnull
+  private static org.apache.kafka.clients.consumer.AcknowledgeType 
toKafkaAcknowledgeType(AcknowledgeType type)
+  {
+    switch (type) {
+      case ACCEPT:
+        return org.apache.kafka.clients.consumer.AcknowledgeType.ACCEPT;
+      case RELEASE:
+        return org.apache.kafka.clients.consumer.AcknowledgeType.RELEASE;
+      case REJECT:
+        return org.apache.kafka.clients.consumer.AcknowledgeType.REJECT;
+      case RENEW:
+        return org.apache.kafka.clients.consumer.AcknowledgeType.RENEW;
+      default:
+        throw new IllegalArgumentException("Unknown acknowledge type: " + 
type);
+    }
+  }
+
+  private static KafkaShareConsumer<byte[], byte[]> createShareConsumer(
+      Map<String, Object> consumerProperties,
+      ObjectMapper sortingMapper,
+      String groupId
+  )
+  {
+    final Map<String, Object> sanitized = 
ShareGroupConsumerProperties.sanitize(consumerProperties);

Review Comment:
   [P2] Unsupported share-consumer configs can still enter through dynamic 
config
   
   sanitize() runs before KafkaRecordSupplier.addConsumerPropertiesFromConfig, 
but that helper expands druid.dynamic.config.provider afterward and copies all 
returned entries directly into the Properties. If a dynamic provider supplies a 
forbidden key such as enable.auto.commit, auto.offset.reset, or 
group.instance.id, KafkaShareConsumer still receives it and the task can fail 
at startup despite the documented stripping behavior. Sanitize after dynamic 
expansion as well, or expand into a map first and then strip unsupported keys 
once.



##########
docs/ingestion/kafka-share-group-ingestion.md:
##########
@@ -0,0 +1,337 @@
+---
+id: kafka-share-group-ingestion
+title: "Kafka share group ingestion"
+sidebar_label: "Kafka share group ingestion"
+description: "Queue-semantics ingestion from Apache Kafka using share groups 
(KIP-932). Scale consumers beyond partition count with at-least-once delivery."
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+:::info
+Requires Apache Kafka 4.0 or higher with share groups (KIP-932) enabled on the 
broker.
+:::
+
+## Overview
+
+Kafka share groups (KIP-932) let multiple consumers read from the same 
partition concurrently. The broker manages per-record acquisition locks and 
explicit acknowledgement, so consumer count is not capped by partition count, 
joining or leaving consumers does not pause the group, and a slow record does 
not block its partition.
+
+Druid's `ShareGroupIndexTask` consumes from a share group and publishes 
segments with at-least-once delivery: records are acknowledged only after their 
segments are atomically registered in the metadata store.
+
+## When to use share group ingestion
+
+| Scenario | Consumer group | Share group |
+|----------|---------------|-------------|
+| Workers needed exceed partition count | Idle workers | All workers active |
+| Elastic scaling (auto-scale events) | Rebalancing pause (30-60s) | Zero 
pause |
+| Per-message processing time varies | Head-of-line blocking | Independent 
processing |
+| Ordered processing required per partition | Yes | No (delivery order not 
guaranteed) |
+
+Choose share groups when throughput and elastic scaling matter more than 
strict per-partition ordering.
+
+## Task spec
+
+Submit a `ShareGroupIndexTask` to the Overlord. There are no start/end offsets 
-- the broker tracks them.
+
+```json
+{
+  "type": "index_kafka_share_group",
+  "dataSchema": {
+    "dataSource": "my_datasource",
+    "timestampSpec": {
+      "column": "__time",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "useSchemaDiscovery": true
+    },
+    "granularitySpec": {
+      "segmentGranularity": "DAY",
+      "queryGranularity": "NONE"
+    }
+  },
+  "ioConfig": {
+    "topic": "my_topic",
+    "groupId": "druid-share-group",
+    "consumerProperties": {
+      "bootstrap.servers": "kafka-broker:9092"
+    },
+    "inputFormat": {
+      "type": "json"
+    },
+    "pollTimeout": 2000
+  },
+  "tuningConfig": {
+    "type": "KafkaTuningConfig",
+    "maxRowsPerSegment": 5000000
+  }
+}
+```
+
+## IO configuration
+
+| Property | Type | Required | Default | Description |
+|----------|------|----------|---------|-------------|
+| `topic` | String | Yes | -- | Kafka topic to consume from. |
+| `groupId` | String | Yes | -- | Share group identifier. Multiple tasks with 
the same `groupId` share the workload. |
+| `consumerProperties` | Map | Yes | -- | Kafka consumer properties. Must 
include `bootstrap.servers`. See [Consumer property 
restrictions](#consumer-property-restrictions). |
+| `inputFormat` | Object | Yes | -- | Input format for parsing records (json, 
csv, avro, etc.). |
+| `pollTimeout` | Long | No | 2000 | Poll timeout in milliseconds. |
+
+### Consumer property restrictions
+
+Share consumers (KIP-932) reject some keys that are valid for regular consumer 
groups. Druid silently strips the keys below from `consumerProperties` (with a 
`WARN` log per stripped key) before constructing the `KafkaShareConsumer`:
+
+| Stripped key | Why |
+|--------------|-----|
+| `auto.offset.reset` | Initial position is broker-controlled for share 
groups. |
+| `enable.auto.commit` | Share consumers always require explicit 
`acknowledge()` + `commitSync()`. |
+| `group.instance.id` | Share groups do not support static membership. |
+| `isolation.level` | Always read-committed for share groups. |
+| `partition.assignment.strategy` | Broker controls per-record delivery for 
share groups. |
+| `interceptor.classes` | Not supported for share consumers. |
+| `session.timeout.ms` | Share groups have no consumer-group session model. |
+| `heartbeat.interval.ms` | Share groups have no heartbeat. |
+| `group.protocol` | Always `SHARE` for share consumers. |
+| `group.remote.assignor` | Not applicable to share groups. |
+
+`share.acknowledgement.mode=explicit` is set automatically and must not be 
overridden.
+
+### Tuning configuration
+
+`tuningConfig` accepts the standard `KafkaTuningConfig` fields. The runner 
currently honors:
+
+- `maxRowsInMemory` / `maxBytesInMemory`: triggers a mid-batch persist when 
the appenderator signals `isPersistRequired`.
+- `maxRowsPerSegment`: when reached during a batch the runner logs the event; 
over-threshold segments are pushed at the end-of-batch publish boundary.
+
+Mid-batch checkpoint and sequence rollover are not supported.
+
+## How it works
+
+1. The task subscribes to the topic with a `KafkaShareConsumer` using the 
configured `groupId`.
+2. The broker delivers batches of records with per-record acquisition locks.
+3. Each polled record is parsed by `StreamChunkReader` (the same multi-row 
parser as `KafkaIndexTask`); a record may produce zero, one, or many 
`InputRow`s. All resulting rows are added to the appenderator before the record 
is acknowledged.
+4. Parse failures go through `ParseExceptionHandler` (so `maxParseExceptions` 
is honored). Bytes/processed/unparseable counters are incremented exactly once 
per row.
+5. Segments persist mid-batch on memory pressure and unconditionally at 
end-of-batch, then publish atomically via `SegmentTransactionalAppendAction`.
+6. After a successful publish, every offset in the batch is acknowledged with 
`ACCEPT` and a `commitSync()` flushes acknowledgements to the broker.
+7. On task failure or graceful stop before publish, unacknowledged records are 
redelivered by the broker after the acquisition lock expires.
+
+## Safety invariants
+
+1. **ACK after publish:** `ACCEPT` is sent only after the segment is 
registered in the metadata store. No data loss on task failure.
+2. **Multi-row safe:** every row produced from a record is added to the 
appenderator before that record is acknowledged.
+3. **Resource safe:** `Appenderator` and `KafkaShareConsumer` are released on 
every exit path.
+4. **Terminal state:** every polled record reaches exactly one terminal state 
-- `ACCEPT`, `RELEASE`, or broker redelivery after lock expiry.
+
+## Graceful stop
+
+When the Overlord asks a task to stop, the runner calls 
`KafkaShareConsumer.wakeup()`. The in-flight `poll()` throws `WakeupException`; 
the runner exits the loop after committing any in-flight batch. Records polled 
but not yet published remain unacknowledged and are redelivered by the broker 
after the acquisition lock expires.
+
+## Acquisition lock duration
+
+The broker controls the lock via `group.share.record.lock.duration.ms`. The 
runner logs the effective value once after the first poll:
+
+```
+Effective broker acquisition lock timeout for share-group[my-group]: 30000 ms
+```
+
+A single thread does both poll and publish. If a batch exceeds the lock 
duration, in-flight records may be redelivered (duplicates). Tune 
`pollTimeout`, `maxRowsInMemory`, and `maxRowsPerSegment` so each cycle stays 
well under the lock window.
+
+## Scaling
+
+Tasks with the same `groupId` share the workload automatically; you can run 
more tasks than partitions:
+
+```
+Topic: 4 partitions
+Tasks with same groupId: 20
+Result: All 20 tasks actively consuming (broker distributes records)
+```
+
+Adding or removing tasks does not trigger a rebalancing pause.
+
+## Delivery semantics
+
+At-least-once. On task failure, records between the last committed 
acknowledgement and the failure point are redelivered, which may produce 
duplicates across restarts. A deduplication cache is planned.
+
+## Metrics
+
+In addition to the standard ingestion metrics (`ingest/events/processed`, 
`ingest/events/unparseable`, `ingest/persists/count`, etc.), share-group 
ingestion emits:
+
+| Metric | Description |
+|--------|-------------|
+| `ingest/shareGroup/commitFailures` | Per-batch count of partitions whose 
`commitSync()` failed. A non-zero value means the affected records will be 
redelivered; alert on sustained non-zero values. |
+
+## Limitations (current release)
+
+- Single-threaded ingestion per task; a future enhancement may add a 
background `RENEW` thread to extend the broker lock for long-running batches.
+- No supervisor integration; tasks are submitted manually via the Overlord 
API. A `KafkaShareGroupSupervisor` is planned as a future enhancement.
+- No deduplication cache (at-least-once).
+- Delivery order within a partition is not guaranteed.
+- Mid-batch checkpoint / sequence rollover is not supported. If a batch 
grossly exceeds `maxRowsPerSegment` the runner still publishes correctly 
(multiple segments per batch), but the threshold is only checked at 
end-of-batch boundaries.
+
+## Demo: end-to-end validation with Druid UI
+
+### Prerequisites
+
+- Java 17
+- Kafka 4.2.0 (with share groups enabled)
+- Druid checked out from this repository (built from source)
+
+### Step 1: Start Kafka with share groups
+
+```bash
+cd kafka_2.13-4.2.0
+
+KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
+bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c 
config/server.properties
+
+echo "group.share.enable=true" >> config/server.properties
+echo "group.share.record.lock.duration.ms=30000" >> config/server.properties
+
+bin/kafka-server-start.sh config/server.properties
+```
+
+### Step 2: Create topic and produce messages
+
+```bash
+cd kafka_2.13-4.2.0
+
+bin/kafka-topics.sh --create --topic druid-share-test --partitions 4 
--bootstrap-server localhost:9092
+
+bin/kafka-console-producer.sh --topic druid-share-test --bootstrap-server 
localhost:9092

Review Comment:
   [P2] Demo produces records before the share group can read them
   
   The demo creates and produces the sample messages before submitting 
druid-demo-share-group, but share-group starting position is broker-controlled 
and this PR strips auto.offset.reset; the tests explicitly set 
share.auto.offset.reset=earliest for this reason. With the broker default of 
latest, users following the demo will submit the task after the sample records 
already exist and ingest zero rows. Move producing after task startup or add a 
kafka-configs/AdminClient step that sets share.auto.offset.reset=earliest for 
the demo group before producing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to