This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8b2d5e9ce9d5dc2c529ac7d885441c966dafa100 Author: Rajan Dhabalia <[email protected]> AuthorDate: Fri May 10 04:10:31 2024 -0700 [fix][client] Fix ReaderBuilder doest not give illegalArgument on connection failure retry (#22639) (cherry picked from commit b56f238f6aaffdc0b37b9f6e2185b219f8708570) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 27 ++++++++++++++++++++++ .../pulsar/client/impl/ReaderBuilderImpl.java | 5 ++-- .../apache/pulsar/client/impl/BuildersTest.java | 2 +- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 00c3eadb06a..df9a97c29eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Reader; @@ -851,4 +854,28 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { assertTrue(reader.hasMessageAvailable()); } } + + @Test + public void testReaderBuilderStateOnRetryFailure() throws Exception { + String ns = "my-property/my-ns"; + String topic = "persistent://" + ns + "/testRetryReader"; + RetentionPolicies retention = new RetentionPolicies(-1, -1); + admin.namespaces().setRetention(ns, retention); + String badUrl = "pulsar://bad-host:8080"; + + PulsarClient client = PulsarClient.builder().serviceUrl(badUrl).build(); + + ReaderBuilder<byte[]> readerBuilder = client.newReader().topic(topic).startMessageFromRollbackDuration(100, + TimeUnit.SECONDS); + + for (int i = 0; i < 3; i++) { + try { + readerBuilder.createAsync().get(1, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.info("It should time out due to invalid url"); + } catch (IllegalArgumentException e) { + fail("It should not fail with corrupt reader state"); + } + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index ca2011cf18a..7e74a8e9c9b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -85,8 +85,9 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } - if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 - || conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { + boolean isStartMsgIdExist = conf.getStartMessageId() != null && conf.getStartMessageId() != MessageId.earliest; + if ((isStartMsgIdExist && conf.getStartMessageFromRollbackDurationInSec() > 0) + || (conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0)) { return FutureUtil .failedFuture(new IllegalArgumentException( "Start message id or start message from roll back must be specified but they cannot be" diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 607689e0e2b..5f52f86d8b0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -106,7 +106,7 @@ public class BuildersTest { @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") public void shouldNotSetTwoOptAtTheSameTime() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); - try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest) + try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.latest) .startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) { // no-op } finally {
