GitHub user startjava edited a comment on the discussion: has MessageListener 
interface and Transactions DEMO code??

i has not interface demo:
public class PulsarConsumerTransaction {

    public static void main(String[] args) throws Exception {
        String serviceUrl = 
"pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(serviceUrl)
                .enableTransaction(true)
                .build();

        String txnTopic1 = "persistent://my-tenant/my-ns/my-txn-topic1";
        String txnTopic2 = "persistent://my-tenant/my-ns/my-txn-topic2";
        List<String> topicList = new ArrayList<>();
        topicList.add(txnTopic1);
        topicList.add(txnTopic2);

        Transaction transaction = pulsarClient.newTransaction()
                .withTransactionTimeout(1, TimeUnit.SECONDS)
                .build().get();

        Consumer<User> consumer = 
pulsarClient.newConsumer(AvroSchema.of(User.class))
                .topics(topicList)
                .subscriptionName("consume-txn")
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();

        //Messages m = consumer.batchReceive();
        int i=0;
        while (true) {
            Message<User> message = consumer.receive();

            try {
                // Do something with the message
                System.out.println("Message received: " + message.getValue());

                // Acknowledge the message so that it can be deleted by the 
message broker
                consumer.acknowledgeCumulativeAsync(message.getMessageId(), 
transaction);
            } catch (Exception e) {
                // Message failed to process, redeliver later
                consumer.negativeAcknowledge(message);
            }
            i++;
            if (i >= 5) {
                transaction.commit();
                transaction = 
pulsarClient.newTransaction().withTransactionTimeout(1, 
TimeUnit.SECONDS).build().get();
                i = 0;
            }
        }
    }
} 

How do I switch to the interface demo code?

GitHub link: 
https://github.com/apache/pulsar/discussions/20184#discussioncomment-5726483

----
This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org

Reply via email to