wzhramc opened a new issue, #24588:
URL: https://github.com/apache/pulsar/issues/24588

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   ## Problem
   
   When implementing custom Pulsar sinks with transactions to the cluster 
itself, there's no way to include the input message acknowledgment within the 
transaction scope. The Record.ack() method operates outside of any transaction. 
I need it because I'm working on a Pulsar sink that needs to process incoming 
messages and produce to multiple output topics atomically.
   
   ### Example
   
   ```java
   @Override
   public void write(Record<String> record) {
       Transaction txn = client.newTransaction().build().get();
       
       // Send to output topics within transaction
       eventProducer.newMessage(txn).value(data).send();
       
       txn.commit().get();
       
       // This happens outside the transaction
       record.ack();
   }
   ```
   
   ### Proposed Solution
   Expose the underlying consumer in SinkContext:
   
   ```java
   public interface SinkContext {
       ...
       Consumer<?> getConsumer();
   }
   ```
   
   This would enable:
   ```java
   @Override
   public void write(Record<String> record) {
       Transaction txn = client.newTransaction().build().get();
       
       eventProducer.newMessage(txn).value(data).send();
       
       // Include acknowledgment in the transaction
       Consumer<?> consumer = sinkContext.getConsumer();
       consumer.acknowledgeAsync(record.getMessageId(), txn).get();
       
       txn.commit().get();
   }
   ```
   
   ### Solution
   
   _No response_
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to