This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b56f238f6aa [fix][client] Fix ReaderBuilder doest not give 
illegalArgument on connection failure retry (#22639)
b56f238f6aa is described below

commit b56f238f6aaffdc0b37b9f6e2185b219f8708570
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri May 10 04:10:31 2024 -0700

    [fix][client] Fix ReaderBuilder doest not give illegalArgument on 
connection failure retry (#22639)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 27 ++++++++++++++++++++++
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  5 ++--
 .../apache/pulsar/client/impl/BuildersTest.java    |  2 +-
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2d3e8d4c6e9..12228220b18 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Reader;
@@ -902,4 +905,28 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
             assertTrue(reader.hasMessageAvailable());
         }
     }
+
+    @Test
+    public void testReaderBuilderStateOnRetryFailure() throws Exception {
+        String ns = "my-property/my-ns";
+        String topic = "persistent://" + ns + "/testRetryReader";
+        RetentionPolicies retention = new RetentionPolicies(-1, -1);
+        admin.namespaces().setRetention(ns, retention);
+        String badUrl = "pulsar://bad-host:8080";
+
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(badUrl).build();
+
+        ReaderBuilder<byte[]> readerBuilder = 
client.newReader().topic(topic).startMessageFromRollbackDuration(100,
+                TimeUnit.SECONDS);
+
+        for (int i = 0; i < 3; i++) {
+            try {
+                readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
+            } catch (TimeoutException e) {
+                log.info("It should time out due to invalid url");
+            } catch (IllegalArgumentException e) {
+                fail("It should not fail with corrupt reader state");
+            }
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 2860cda0cee..ef230475be5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -86,8 +86,9 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> 
{
                     .failedFuture(new IllegalArgumentException("Topic name 
must be set on the reader builder"));
         }
 
-        if (conf.getStartMessageId() != null && 
conf.getStartMessageFromRollbackDurationInSec() > 0
-                || conf.getStartMessageId() == null && 
conf.getStartMessageFromRollbackDurationInSec() <= 0) {
+        boolean isStartMsgIdExist = conf.getStartMessageId() != null && 
conf.getStartMessageId() != MessageId.earliest;
+        if ((isStartMsgIdExist && 
conf.getStartMessageFromRollbackDurationInSec() > 0)
+                || (conf.getStartMessageId() == null && 
conf.getStartMessageFromRollbackDurationInSec() <= 0)) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException(
                             "Start message id or start message from roll back 
must be specified but they cannot be"
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index 607689e0e2b..5f52f86d8b0 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -106,7 +106,7 @@ public class BuildersTest {
     @Test(expectedExceptions = {PulsarClientException.class}, 
expectedExceptionsMessageRegExp = ".* must be specified but they cannot be 
specified at the same time.*")
     public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
         PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
-        try (Reader reader = 
client.newReader().topic("abc").startMessageId(MessageId.earliest)
+        try (Reader reader = 
client.newReader().topic("abc").startMessageId(MessageId.latest)
                 .startMessageFromRollbackDuration(10, 
TimeUnit.HOURS).create()) {
             // no-op
         } finally {

Reply via email to