wzhramc opened a new issue, #24588: URL: https://github.com/apache/pulsar/issues/24588
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation ## Problem When implementing custom Pulsar sinks with transactions to the cluster itself, there's no way to include the input message acknowledgment within the transaction scope. The Record.ack() method operates outside of any transaction. I need it because I'm working on a Pulsar sink that needs to process incoming messages and produce to multiple output topics atomically. ### Example ```java @Override public void write(Record<String> record) { Transaction txn = client.newTransaction().build().get(); // Send to output topics within transaction eventProducer.newMessage(txn).value(data).send(); txn.commit().get(); // This happens outside the transaction record.ack(); } ``` ### Proposed Solution Expose the underlying consumer in SinkContext: ```java public interface SinkContext { ... Consumer<?> getConsumer(); } ``` This would enable: ```java @Override public void write(Record<String> record) { Transaction txn = client.newTransaction().build().get(); eventProducer.newMessage(txn).value(data).send(); // Include acknowledgment in the transaction Consumer<?> consumer = sinkContext.getConsumer(); consumer.acknowledgeAsync(record.getMessageId(), txn).get(); txn.commit().get(); } ``` ### Solution _No response_ ### Alternatives _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org