Shekhar Prasad Rajak created KAFKA-19883:
--------------------------------------------

             Summary: Support Hierarchical Transactional Acknowledgments for 
Share Groups
                 Key: KAFKA-19883
                 URL: https://issues.apache.org/jira/browse/KAFKA-19883
             Project: Kafka
          Issue Type: New Feature
          Components: clients, consumer, core
    Affects Versions: 4.1.0
            Reporter: Shekhar Prasad Rajak


Add transactional acknowledgment support to Kafka Share Groups to enable 
+*exactly-once read semantics*+ in distributed stream processing engines like 
Apache Spark and Apache Flink.

 

 *Current Behavior:*
  Share Groups support only immediate acknowledgment mode where records are 
permanently acknowledged as soon as consumer.acknowledge() is called. This 
creates data loss scenarios in distributed streaming frameworks:

  1. Worker polls and acknowledges records to Kafka
  2. Records are permanently removed from Kafka
  3. Checkpoint/batch fails before writing to sink
  4. Records are lost (acknowledged but never reached sink)

  This prevents exactly-once semantics in Spark/Flink with Share Groups.

  *Expected Behavior:*
  Share Groups should support transactional acknowledgment mode where:

  1. Records are acknowledged within transactions
  2. Acknowledgments are pending until transaction commits
  3. Transaction rollback makes ALL records available for retry
  4. Enables atomic coordination with framework checkpoints/batches

  *Proposed Solution:*

  Implement hierarchical two-phase commit transactions:
{quote}  BatchTransaction (coordinator/driver level)
    ├─ TaskTransaction1 (worker/executor level)
    │    ├─ acknowledge(record1, ACCEPT)
    │    ├─ acknowledge(record2, ACCEPT)
    │    └─ prepare() ← Phase 1
    │
    ├─ TaskTransaction2 (worker/executor level)
    │    ├─ acknowledge(record3, ACCEPT)
    │    └─ prepare() ← Phase 1
    │
    └─ commit() ← Phase 2: Atomically commits all task transactions
{quote}
 

  - Batch transactions (coordinator-level)
  - Task transactions (worker-level)
  - Records stay in Kafka until transaction commits
  - Rollback on failure → no data loss

 

  *API Changes:*

  New Consumer API:
  // Batch-level transaction
  BatchTransaction beginBatchTransaction(String batchTransactionId);
  void commitBatchTransaction(BatchTransaction txn, Duration timeout) 
      throws TimeoutException, TransactionException;
  void rollbackBatchTransaction(BatchTransaction txn);

  // Task-level transaction
  TaskTransaction beginTaskTransaction(BatchTransaction parent, String taskId);
  void acknowledge(TaskTransaction txn, ConsumerRecord<K,V> record, 
AcknowledgeType type);
  void prepareTaskTransaction(TaskTransaction txn);
  void commitTaskTransaction(TaskTransaction txn);
  void rollbackTaskTransaction(TaskTransaction txn);

  *New Configuration:*
  share.acknowledgement.mode=transactional  # Default: explicit
  share.batch.transaction.timeout.ms=300000
  share.task.transaction.timeout.ms=120000
  share.transaction.commit.timeout.ms=30000
  share.transaction.auto.rollback=true

 

*Target Use Cases:* Apache Spark structured streaming, Apache Flink 
exactly-once checkpoints, any coordinator-worker streaming framework.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to