Shekhar Prasad Rajak created KAFKA-19883:
--------------------------------------------
Summary: Support Hierarchical Transactional Acknowledgments for
Share Groups
Key: KAFKA-19883
URL: https://issues.apache.org/jira/browse/KAFKA-19883
Project: Kafka
Issue Type: New Feature
Components: clients, consumer, core
Affects Versions: 4.1.0
Reporter: Shekhar Prasad Rajak
Add transactional acknowledgment support to Kafka Share Groups to enable
+*exactly-once read semantics*+ in distributed stream processing engines like
Apache Spark and Apache Flink.
*Current Behavior:*
Share Groups support only immediate acknowledgment mode where records are
permanently acknowledged as soon as consumer.acknowledge() is called. This
creates data loss scenarios in distributed streaming frameworks:
1. Worker polls and acknowledges records to Kafka
2. Records are permanently removed from Kafka
3. Checkpoint/batch fails before writing to sink
4. Records are lost (acknowledged but never reached sink)
This prevents exactly-once semantics in Spark/Flink with Share Groups.
*Expected Behavior:*
Share Groups should support transactional acknowledgment mode where:
1. Records are acknowledged within transactions
2. Acknowledgments are pending until transaction commits
3. Transaction rollback makes ALL records available for retry
4. Enables atomic coordination with framework checkpoints/batches
*Proposed Solution:*
Implement hierarchical two-phase commit transactions:
{quote} BatchTransaction (coordinator/driver level)
├─ TaskTransaction1 (worker/executor level)
│ ├─ acknowledge(record1, ACCEPT)
│ ├─ acknowledge(record2, ACCEPT)
│ └─ prepare() ← Phase 1
│
├─ TaskTransaction2 (worker/executor level)
│ ├─ acknowledge(record3, ACCEPT)
│ └─ prepare() ← Phase 1
│
└─ commit() ← Phase 2: Atomically commits all task transactions
{quote}
- Batch transactions (coordinator-level)
- Task transactions (worker-level)
- Records stay in Kafka until transaction commits
- Rollback on failure → no data loss
*API Changes:*
New Consumer API:
// Batch-level transaction
BatchTransaction beginBatchTransaction(String batchTransactionId);
void commitBatchTransaction(BatchTransaction txn, Duration timeout)
throws TimeoutException, TransactionException;
void rollbackBatchTransaction(BatchTransaction txn);
// Task-level transaction
TaskTransaction beginTaskTransaction(BatchTransaction parent, String taskId);
void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record,
AcknowledgeType type);
void prepareTaskTransaction(TaskTransaction txn);
void commitTaskTransaction(TaskTransaction txn);
void rollbackTaskTransaction(TaskTransaction txn);
*New Configuration:*
share.acknowledgement.mode=transactional # Default: explicit
share.batch.transaction.timeout.ms=300000
share.task.transaction.timeout.ms=120000
share.transaction.commit.timeout.ms=30000
share.transaction.auto.rollback=true
*Target Use Cases:* Apache Spark structured streaming, Apache Flink
exactly-once checkpoints, any coordinator-worker streaming framework.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)