This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new de5221b2a45 [fix][client] Fix async APIs to return failed futures on 
validation errors (#25287)
de5221b2a45 is described below

commit de5221b2a45f17e2de17ec5bdf1ab96c3e4b309e
Author: Hao Zhang <[email protected]>
AuthorDate: Mon Mar 9 19:42:23 2026 +0800

    [fix][client] Fix async APIs to return failed futures on validation errors 
(#25287)
    
    Co-authored-by: 张浩 <[email protected]>
---
 .../pulsar/client/impl/MessageChunkingTest.java    | 37 ----------------------
 .../apache/pulsar/client/impl/ConsumerBase.java    | 11 +++++--
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 ++-
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  6 ++--
 .../client/impl/MultiTopicsConsumerImplTest.java   | 11 +++++++
 .../client/impl/ProducerBuilderImplTest.java       | 20 ++++++++++++
 6 files changed, 46 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 1a122b94416..d8a3698636d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -99,19 +99,6 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         return new Object[][] { { true }, { false } };
     }
 
-    @Test
-    public void testInvalidConfig() throws Exception {
-        final String topicName = "persistent://my-property/my-ns/my-topic1";
-        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topicName);
-        // batching and chunking can't be enabled together
-        try {
-            Producer<byte[]> producer = 
producerBuilder.enableChunking(true).enableBatching(true).create();
-            fail("producer creation should have fail");
-        } catch (IllegalArgumentException ie) {
-            // Ok
-        }
-    }
-
     @Test(dataProvider = "ackReceiptEnabledWithMaxMessageSize")
     public void testLargeMessage(boolean ackReceiptEnabled, boolean 
clientSizeMaxMessageSize) throws Exception {
 
@@ -448,30 +435,6 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         Assert.assertEquals(((ConsumerImpl<String>) 
consumer).getAvailablePermits(), 8);
     }
 
-    /**
-     * Validate that chunking is not supported with batching and 
non-persistent topic.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testInvalidUseCaseForChunking() throws Exception {
-
-        log.info("-- Starting {} test --", methodName);
-        this.conf.setMaxMessageSize(5);
-        final String topicName = "persistent://my-property/my-ns/my-topic1";
-
-        ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topicName);
-
-        try {
-            Producer<byte[]> producer = 
producerBuilder.enableChunking(true).enableBatching(true).create();
-            fail("it should have failied because chunking can't be used with 
batching enabled");
-        } catch (IllegalArgumentException ie) {
-            // Ok
-        }
-
-        log.info("-- Exiting {} test --", methodName);
-    }
-
     @Test
     public void testExpireIncompleteChunkMessage() throws Exception{
         final String topicName = "persistent://prop/use/ns-abc/expireMsg";
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 1755c94b0de..aed525c9eee 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -662,7 +661,10 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
         TransactionImpl txnImpl = null;
         if (null != txn) {
-            checkArgument(txn instanceof TransactionImpl);
+            if (!(txn instanceof TransactionImpl)) {
+                return FutureUtil.failedFuture(new IllegalArgumentException(
+                        "Expected txn to be an instance of TransactionImpl, 
but got " + txn.getClass().getName()));
+            }
             txnImpl = (TransactionImpl) txn;
             CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
            if (!txnImpl.checkIfOpen(completableFuture)) {
@@ -691,7 +693,10 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
 
         TransactionImpl txnImpl = null;
         if (null != txn) {
-            checkArgument(txn instanceof TransactionImpl);
+            if (!(txn instanceof TransactionImpl)) {
+                return FutureUtil.failedFuture(new IllegalArgumentException(
+                        "Expected txn to be an instance of TransactionImpl, 
but got " + txn.getClass().getName()));
+            }
             txnImpl = (TransactionImpl) txn;
         }
         return doAcknowledgeWithTxn(messageId, AckType.Cumulative, 
Collections.emptyMap(), txnImpl);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8061da4e91c..7865939d77d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1247,7 +1247,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     // un-subscribe a given topic
     public CompletableFuture<Void> unsubscribeAsync(String topicName) {
-        checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + 
topicName);
+        if (!TopicName.isValid(topicName)) {
+            return FutureUtil.failedFuture(new 
IllegalArgumentException("Invalid topic name:" + topicName));
+        }
 
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil.failedFuture(
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index e176cc41bc6..fe79d618c6a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -93,8 +93,10 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
     @Override
     public CompletableFuture<Producer<T>> createAsync() {
         // config validation
-        checkArgument(!(conf.isBatchingEnabled() && conf.isChunkingEnabled()),
-                "Batching and chunking of messages can't be enabled together");
+        if (conf.isBatchingEnabled() && conf.isChunkingEnabled()) {
+            return FutureUtil.failedFuture(
+                    new IllegalArgumentException("Batching and chunking of 
messages can't be enabled together"));
+        }
         if (conf.getTopicName() == null) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic name 
must be set on the producer builder"));
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 02a4d2ebba8..54175613e3b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -39,6 +39,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -223,6 +224,16 @@ public class MultiTopicsConsumerImplTest {
         verify(clientMock, times(1)).cleanupConsumer(any());
     }
 
+    @Test
+    public void testUnsubscribeAsyncInvalidTopicNameReturnsFailedFuture() {
+        MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
+        CompletableFuture<Void> future = 
consumer.unsubscribeAsync("persistent://public/invalid-topic");
+
+        assertTrue(future.isCompletedExceptionally());
+        CompletionException ex = expectThrows(CompletionException.class, 
future::join);
+        assertTrue(ex.getCause() instanceof IllegalArgumentException);
+    }
+
     @Test
     public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() 
throws Exception {
         ExecutorProvider executorProvider = mock(ExecutorProvider.class);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index 45e31e00b4c..738614398b1 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -23,9 +23,12 @@ import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRouter;
@@ -385,6 +388,23 @@ public class ProducerBuilderImplTest {
         producerBuilderImpl.maxPendingMessagesAcrossPartitions(1000);
     }
 
+    @Test
+    public void testCreateAsyncFailsWhenBatchingAndChunkingEnabled() {
+        producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
+        CompletableFuture<Producer<byte[]>> future = 
producerBuilderImpl.topic(TOPIC_NAME)
+                .enableBatching(true)
+                .enableChunking(true)
+                .createAsync();
+
+        assertTrue(future.isCompletedExceptionally());
+        try {
+            future.join();
+            fail("Expected IllegalArgumentException");
+        } catch (CompletionException e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
+        }
+    }
+
     private class CustomMessageRouter implements MessageRouter {
         @Override
         public int choosePartition(Message<?> msg, TopicMetadata metadata) {

Reply via email to