This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9e6f7a130e22381a9726bf02ea985a9ab380a633 Author: Omar Yasin <oma...@gmail.com> AuthorDate: Thu Sep 4 23:11:31 2025 +0100 [improve][client] PIP-407 Add newMessage with schema and transactions (#23942) Co-authored-by: Ómar Kjartan Yasin <oya...@apple.com> (cherry picked from commit 536dce3a9bbb62985400acf69d67516603a300a8) --- .../pulsar/broker/transaction/TransactionTest.java | 37 ++++++++++++++++++++++ .../org/apache/pulsar/client/api/Producer.java | 15 +++++++++ .../apache/pulsar/client/impl/ProducerBase.java | 9 ++++++ 3 files changed, 61 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index c54ffe9a73a..c3f969bf5f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.Unpooled; @@ -488,6 +489,42 @@ public class TransactionTest extends TransactionTestBase { }); } + @Test + public void testSendAndAckWithSchema() throws Exception { + String topic = NAMESPACE1 + "/testAsyncSendAndAckWithSchema"; + String topicName = "subscription"; + getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .producerName("producer") + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(topicName) + .subscribe(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS) + .build() + .get(); + + MessageId messageId = producer.newMessage(Schema.STRING, txn) + .value("testSendAndAckWithSchema") + .send(); + + txn.commit().get(); + + Message<byte[]> message = consumer.receive(); + assertEquals(new String(message.getValue(), StandardCharsets.UTF_8), "testSendAndAckWithSchema"); + assertNotNull(message.getSchemaVersion()); + } + @Test public void testGetTxnID() throws Exception { Transaction transaction = pulsarClient.newTransaction() diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java index 3c5102eed2f..d6b07713708 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java @@ -191,6 +191,21 @@ public interface Producer<T> extends Closeable { * @since 2.7.0 */ TypedMessageBuilder<T> newMessage(Transaction txn); + + /** + * Create a new message builder with transaction and schema, not required same parameterized type with the + * producer. + * + * <p>After the transaction commit, it will be made visible to consumer. + * + * <p>After the transaction abort, it will never be visible to consumer. + * + * @return a typed message builder that can be used to construct the message to be sent through this producer + * @see #newMessage() + */ + <V> TypedMessageBuilder<V> newMessage(Schema<V> schema, + Transaction txn); + /** * Get the last sequence id that was published by this producer. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 12e380fdd51..b7085d28ee2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -79,6 +79,7 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T return new TypedMessageBuilderImpl<>(this, schema); } + @Override public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) { checkArgument(schema != null); return new TypedMessageBuilderImpl<>(this, schema); @@ -92,6 +93,14 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn); } + @Override + public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema, + Transaction txn) { + checkArgument(txn instanceof TransactionImpl); + checkArgument(schema != null); + return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) txn); + } + abstract CompletableFuture<MessageId> internalSendAsync(Message<?> message); abstract CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn);