This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 77833d90dafe36e62b7962af60f25fc54389960b
Author: Yijie Shen <[email protected]>
AuthorDate: Mon Feb 24 12:24:58 2020 +0800

    [Reader] Should set either start message id or start message from roll back 
duration. (#6392)
    
    Currently, when constructing a reader, users can set both start message id 
and start time.
    
    This is strange and the behavior should be forbidden.
    (cherry picked from commit f862961cb84c0cc19dff29b8db5a54a6c578fbe4)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  2 +-
 .../pulsar/client/impl/ReaderBuilderImpl.java      | 10 +++++++--
 .../apache/pulsar/client/impl/BuildersTest.java    | 25 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 55daf6b..b6c7c01 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -229,7 +229,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest 
{
 
         // (3) Create reader and set position 1 hour back so, it should only 
read messages which are 2 hours old which
         // published on step 2
-        Reader<byte[]> reader = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest)
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
                 .startMessageFromRollbackDuration(2, TimeUnit.HOURS).create();
 
         List<MessageId> receivedMessageIds = Lists.newArrayList();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index c024a0b..921cf2d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -78,9 +78,15 @@ public class ReaderBuilderImpl<T> implements 
ReaderBuilder<T> {
                     .failedFuture(new IllegalArgumentException("Topic name 
must be set on the reader builder"));
         }
 
-        if (conf.getStartMessageId() == null) {
+        if (conf.getStartMessageId() != null && 
conf.getStartMessageFromRollbackDurationInSec() > 0 ||
+                conf.getStartMessageId() == null && 
conf.getStartMessageFromRollbackDurationInSec() <= 0) {
             return FutureUtil
-                    .failedFuture(new IllegalArgumentException("Start message 
id must be set on the reader builder"));
+                    .failedFuture(new IllegalArgumentException(
+                            "Start message id or start message from roll back 
must be specified but they cannot be specified at the same time"));
+        }
+
+        if (conf.getStartMessageFromRollbackDurationInSec() > 0) {
+            conf.setStartMessageId(MessageId.earliest);
         }
 
         return client.createReaderAsync(conf, schema);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index 2560d67..4f7554e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -26,8 +26,12 @@ import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.testng.annotations.Test;
 
@@ -96,5 +100,26 @@ public class BuildersTest {
         assertTrue(obj instanceof ReaderConfigurationData);
         assertEquals(((ReaderConfigurationData) obj).getTopicName(), 
topicName);
         assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), 
messageId);
+        client.close();
+    }
+
+    @Test(expectedExceptions = {PulsarClientException.class}, 
expectedExceptionsMessageRegExp = ".* must be specified but they cannot be 
specified at the same time.*")
+    public void shouldNotSetTwoOptAtTheSameTime() throws Exception {
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+        try (Reader reader = 
client.newReader().topic("abc").startMessageId(MessageId.earliest).startMessageFromRollbackDuration(10,
 TimeUnit.HOURS).create()) {
+            // no-op
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test(expectedExceptions = {PulsarClientException.class}, 
expectedExceptionsMessageRegExp = ".* must be specified but they cannot be 
specified at the same time.*")
+    public void shouldSetOneStartOpt() throws Exception {
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+        try (Reader reader = client.newReader().topic("abc").create()) {
+            // no-op
+        } finally {
+            client.close();
+        }
     }
 }

Reply via email to