This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 34f8e0e9456 [improve] [broker] Support create RawReader based on 
configuration (#22280)
34f8e0e9456 is described below

commit 34f8e0e9456674cd6459105cd7a3619b113b06cf
Author: Hang Chen <[email protected]>
AuthorDate: Mon Mar 18 18:49:52 2024 +0800

    [improve] [broker] Support create RawReader based on configuration (#22280)
---
 .../org/apache/pulsar/client/api/RawReader.java    | 11 ++++++
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  8 +++++
 .../apache/pulsar/client/impl/RawReaderTest.java   | 39 ++++++++++++++++++++--
 3 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index b7805c36b3b..55483708fdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawReaderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
 /**
  * Topic reader which receives raw messages (i.e. as they are stored in the 
managed ledger).
@@ -43,6 +44,16 @@ public interface RawReader {
         return future.thenApply(__ -> r);
     }
 
+    static CompletableFuture<RawReader> create(PulsarClient client,
+                                               
ConsumerConfigurationData<byte[]> consumerConfiguration,
+                                               boolean 
createTopicIfDoesNotExist) {
+        CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
+        RawReader r = new RawReaderImpl((PulsarClientImpl) client,
+                consumerConfiguration, future, createTopicIfDoesNotExist);
+        return future.thenApply(__ -> r);
+    }
+
+
     /**
      * Get the topic for the reader.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3d7ad9f5865..5ac051d2271 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -65,6 +65,14 @@ public class RawReaderImpl implements RawReader {
         consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
     }
 
+    public RawReaderImpl(PulsarClientImpl client, 
ConsumerConfigurationData<byte[]> consumerConfiguration,
+                         CompletableFuture<Consumer<byte[]>> consumerFuture,
+                         boolean createTopicIfDoesNotExist) {
+        this.consumerConfiguration = consumerConfiguration;
+        consumer = new RawConsumerImpl(client, consumerConfiguration, 
consumerFuture, createTopicIfDoesNotExist);
+    }
+
+
     @Override
     public String getTopic() {
         return consumerConfiguration.getTopicNames().stream()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index d3fcc36a546..d9ddc00b2e8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -44,6 +44,9 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -56,6 +59,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE;
+
 @Test(groups = "broker-impl")
 @Slf4j
 public class RawReaderTest extends MockedPulsarServiceBaseTest {
@@ -195,6 +200,36 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
+    @Test
+    public void testRawReaderWithConfigurationCreation() throws Exception {
+        int numKeys = 10;
+
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
+
+        Set<String> keys = publishMessages(topic, numKeys);
+        ConsumerConfigurationData<byte[]> consumerConfiguration = new 
ConsumerConfigurationData<>();
+        consumerConfiguration.getTopicNames().add(topic);
+        consumerConfiguration.setSubscriptionName(subscription);
+        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+        
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
+        consumerConfiguration.setReadCompacted(true);
+        
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+        consumerConfiguration.setAckReceiptEnabled(true);
+        RawReader reader = RawReader.create(pulsarClient, 
consumerConfiguration, true).get();
+
+        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
+        while (true) {
+            try (RawMessage m = reader.readNextAsync().get()) {
+                Assert.assertTrue(keys.remove(extractKey(m)));
+                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+                    break;
+                }
+            }
+        }
+        Assert.assertTrue(keys.isEmpty());
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
+    }
+
     @Test
     public void testSeekToStart() throws Exception {
         int numKeys = 10;
@@ -279,7 +314,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
      */
     @Test
     public void testFlowControl() throws Exception {
-        int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
+        int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5;
         String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         publishMessages(topic, numMessages);
@@ -311,7 +346,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testFlowControlBatch() throws Exception {
-        int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
+        int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5;
         String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         publishMessages(topic, numMessages, true);

Reply via email to