This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 34f8e0e9456 [improve] [broker] Support create RawReader based on
configuration (#22280)
34f8e0e9456 is described below
commit 34f8e0e9456674cd6459105cd7a3619b113b06cf
Author: Hang Chen <[email protected]>
AuthorDate: Mon Mar 18 18:49:52 2024 +0800
[improve] [broker] Support create RawReader based on configuration (#22280)
---
.../org/apache/pulsar/client/api/RawReader.java | 11 ++++++
.../apache/pulsar/client/impl/RawReaderImpl.java | 8 +++++
.../apache/pulsar/client/impl/RawReaderTest.java | 39 ++++++++++++++++++++--
3 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index b7805c36b3b..55483708fdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawReaderImpl;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
/**
* Topic reader which receives raw messages (i.e. as they are stored in the
managed ledger).
@@ -43,6 +44,16 @@ public interface RawReader {
return future.thenApply(__ -> r);
}
+ static CompletableFuture<RawReader> create(PulsarClient client,
+
ConsumerConfigurationData<byte[]> consumerConfiguration,
+ boolean
createTopicIfDoesNotExist) {
+ CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
+ RawReader r = new RawReaderImpl((PulsarClientImpl) client,
+ consumerConfiguration, future, createTopicIfDoesNotExist);
+ return future.thenApply(__ -> r);
+ }
+
+
/**
* Get the topic for the reader.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3d7ad9f5865..5ac051d2271 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -65,6 +65,14 @@ public class RawReaderImpl implements RawReader {
consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture, createTopicIfDoesNotExist);
}
+ public RawReaderImpl(PulsarClientImpl client,
ConsumerConfigurationData<byte[]> consumerConfiguration,
+ CompletableFuture<Consumer<byte[]>> consumerFuture,
+ boolean createTopicIfDoesNotExist) {
+ this.consumerConfiguration = consumerConfiguration;
+ consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture, createTopicIfDoesNotExist);
+ }
+
+
@Override
public String getTopic() {
return consumerConfiguration.getTopicNames().stream()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index d3fcc36a546..d9ddc00b2e8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -44,6 +44,9 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -56,6 +59,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static
org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE;
+
@Test(groups = "broker-impl")
@Slf4j
public class RawReaderTest extends MockedPulsarServiceBaseTest {
@@ -195,6 +200,36 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
reader.closeAsync().get(3, TimeUnit.SECONDS);
}
+ @Test
+ public void testRawReaderWithConfigurationCreation() throws Exception {
+ int numKeys = 10;
+
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
+
+ Set<String> keys = publishMessages(topic, numKeys);
+ ConsumerConfigurationData<byte[]> consumerConfiguration = new
ConsumerConfigurationData<>();
+ consumerConfiguration.getTopicNames().add(topic);
+ consumerConfiguration.setSubscriptionName(subscription);
+ consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
+ consumerConfiguration.setReadCompacted(true);
+
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+ consumerConfiguration.setAckReceiptEnabled(true);
+ RawReader reader = RawReader.create(pulsarClient,
consumerConfiguration, true).get();
+
+ MessageId lastMessageId = reader.getLastMessageIdAsync().get();
+ while (true) {
+ try (RawMessage m = reader.readNextAsync().get()) {
+ Assert.assertTrue(keys.remove(extractKey(m)));
+ if (lastMessageId.compareTo(m.getMessageId()) == 0) {
+ break;
+ }
+ }
+ }
+ Assert.assertTrue(keys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
+ }
+
@Test
public void testSeekToStart() throws Exception {
int numKeys = 10;
@@ -279,7 +314,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
*/
@Test
public void testFlowControl() throws Exception {
- int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
+ int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5;
String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numMessages);
@@ -311,7 +346,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testFlowControlBatch() throws Exception {
- int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
+ int numMessages = DEFAULT_RECEIVER_QUEUE_SIZE * 5;
String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numMessages, true);