This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9e6f7a130e22381a9726bf02ea985a9ab380a633
Author: Omar Yasin <oma...@gmail.com>
AuthorDate: Thu Sep 4 23:11:31 2025 +0100

    [improve][client]  PIP-407 Add newMessage with schema and transactions 
(#23942)
    
    Co-authored-by: Ómar Kjartan Yasin <oya...@apple.com>
    (cherry picked from commit 536dce3a9bbb62985400acf69d67516603a300a8)
---
 .../pulsar/broker/transaction/TransactionTest.java | 37 ++++++++++++++++++++++
 .../org/apache/pulsar/client/api/Producer.java     | 15 +++++++++
 .../apache/pulsar/client/impl/ProducerBase.java    |  9 ++++++
 3 files changed, 61 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index c54ffe9a73a..c3f969bf5f7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.Unpooled;
@@ -488,6 +489,42 @@ public class TransactionTest extends TransactionTestBase {
         });
     }
 
+    @Test
+    public void testSendAndAckWithSchema() throws Exception {
+        String topic = NAMESPACE1 + "/testAsyncSendAndAckWithSchema";
+        String topicName = "subscription";
+        
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .producerName("producer")
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(topicName)
+                .subscribe();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        MessageId messageId = producer.newMessage(Schema.STRING, txn)
+                .value("testSendAndAckWithSchema")
+                .send();
+
+        txn.commit().get();
+
+        Message<byte[]> message = consumer.receive();
+        assertEquals(new String(message.getValue(), StandardCharsets.UTF_8), 
"testSendAndAckWithSchema");
+        assertNotNull(message.getSchemaVersion());
+    }
+
     @Test
     public void testGetTxnID() throws Exception {
         Transaction transaction = pulsarClient.newTransaction()
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
index 3c5102eed2f..d6b07713708 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
@@ -191,6 +191,21 @@ public interface Producer<T> extends Closeable {
      * @since 2.7.0
      */
     TypedMessageBuilder<T> newMessage(Transaction txn);
+
+    /**
+     * Create a new message builder with transaction and schema, not required 
same parameterized type with the
+     * producer.
+     *
+     * <p>After the transaction commit, it will be made visible to consumer.
+     *
+     * <p>After the transaction abort, it will never be visible to consumer.
+     *
+     * @return a typed message builder that can be used to construct the 
message to be sent through this producer
+     * @see #newMessage()
+     */
+    <V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
+                                          Transaction txn);
+
     /**
      * Get the last sequence id that was published by this producer.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 12e380fdd51..b7085d28ee2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -79,6 +79,7 @@ public abstract class ProducerBase<T> extends HandlerState 
implements Producer<T
         return new TypedMessageBuilderImpl<>(this, schema);
     }
 
+    @Override
     public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) {
         checkArgument(schema != null);
         return new TypedMessageBuilderImpl<>(this, schema);
@@ -92,6 +93,14 @@ public abstract class ProducerBase<T> extends HandlerState 
implements Producer<T
         return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) 
txn);
     }
 
+    @Override
+    public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema,
+                                                 Transaction txn) {
+        checkArgument(txn instanceof TransactionImpl);
+        checkArgument(schema != null);
+        return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) 
txn);
+    }
+
     abstract CompletableFuture<MessageId> internalSendAsync(Message<?> 
message);
 
     abstract CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> 
message, Transaction txn);

Reply via email to