This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b56f238f6aa [fix][client] Fix ReaderBuilder doest not give
illegalArgument on connection failure retry (#22639)
b56f238f6aa is described below
commit b56f238f6aaffdc0b37b9f6e2185b219f8708570
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri May 10 04:10:31 2024 -0700
[fix][client] Fix ReaderBuilder doest not give illegalArgument on
connection failure retry (#22639)
---
.../org/apache/pulsar/client/impl/ReaderTest.java | 27 ++++++++++++++++++++++
.../pulsar/client/impl/ReaderBuilderImpl.java | 5 ++--
.../apache/pulsar/client/impl/BuildersTest.java | 2 +-
3 files changed, 31 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2d3e8d4c6e9..12228220b18 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
@@ -902,4 +905,28 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
assertTrue(reader.hasMessageAvailable());
}
}
+
+ @Test
+ public void testReaderBuilderStateOnRetryFailure() throws Exception {
+ String ns = "my-property/my-ns";
+ String topic = "persistent://" + ns + "/testRetryReader";
+ RetentionPolicies retention = new RetentionPolicies(-1, -1);
+ admin.namespaces().setRetention(ns, retention);
+ String badUrl = "pulsar://bad-host:8080";
+
+ PulsarClient client =
PulsarClient.builder().serviceUrl(badUrl).build();
+
+ ReaderBuilder<byte[]> readerBuilder =
client.newReader().topic(topic).startMessageFromRollbackDuration(100,
+ TimeUnit.SECONDS);
+
+ for (int i = 0; i < 3; i++) {
+ try {
+ readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ log.info("It should time out due to invalid url");
+ } catch (IllegalArgumentException e) {
+ fail("It should not fail with corrupt reader state");
+ }
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 2860cda0cee..ef230475be5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -86,8 +86,9 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T>
{
.failedFuture(new IllegalArgumentException("Topic name
must be set on the reader builder"));
}
- if (conf.getStartMessageId() != null &&
conf.getStartMessageFromRollbackDurationInSec() > 0
- || conf.getStartMessageId() == null &&
conf.getStartMessageFromRollbackDurationInSec() <= 0) {
+ boolean isStartMsgIdExist = conf.getStartMessageId() != null &&
conf.getStartMessageId() != MessageId.earliest;
+ if ((isStartMsgIdExist &&
conf.getStartMessageFromRollbackDurationInSec() > 0)
+ || (conf.getStartMessageId() == null &&
conf.getStartMessageFromRollbackDurationInSec() <= 0)) {
return FutureUtil
.failedFuture(new IllegalArgumentException(
"Start message id or start message from roll back
must be specified but they cannot be"
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index 607689e0e2b..5f52f86d8b0 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -106,7 +106,7 @@ public class BuildersTest {
@Test(expectedExceptions = {PulsarClientException.class},
expectedExceptionsMessageRegExp = ".* must be specified but they cannot be
specified at the same time.*")
public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
- try (Reader reader =
client.newReader().topic("abc").startMessageId(MessageId.earliest)
+ try (Reader reader =
client.newReader().topic("abc").startMessageId(MessageId.latest)
.startMessageFromRollbackDuration(10,
TimeUnit.HOURS).create()) {
// no-op
} finally {