This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 128287e produce/consume with 1 partitioned topic (#4883)
128287e is described below
commit 128287e35eab6c3ceb951074dac711dd1d8e829d
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Aug 6 20:17:20 2019 +0800
produce/consume with 1 partitioned topic (#4883)
Motivation
In PR #4764, we allow to create partitioned topic with 1 partition, But in
Pulsar Client, user still not able to do it.
This fix try to make sure user could create consumer/producer for 1
partitioned topic .
Modifications
- old and new added unit test passed.
---
.../client/api/SimpleProducerConsumerTest.java | 80 ++++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
.../pulsar/client/impl/PulsarClientImpl.java | 8 +--
3 files changed, 85 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 901bb5a..35d4c77 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
@@ -26,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -3117,4 +3119,82 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
consumer5.close();
log.info("-- Exiting {} test --", methodName);
}
+
+ /**
+ * This test verifies Producer and Consumer of PartitionedTopic with 1
partition works well.
+ *
+ * <pre>
+ * 1. create producer/consumer with both original name and
PARTITIONED_TOPIC_SUFFIX.
+ * 2. verify producer/consumer could produce/consume messages from same
underline persistent topic.
+ * </pre>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPartitionedTopicWithOnePartition() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String topicName =
"persistent://my-property/my-ns/one-partitioned-topic";
+ final String subscriptionName = "my-sub-";
+
+ // create partitioned topic
+ admin.topics().createPartitionedTopic(topicName, 1);
+
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
1);
+
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName + 1)
+ .consumerName("aaa")
+ .subscribe();
+ log.info("Consumer1 created. topic: {}", consumer1.getTopic());
+
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
+ .subscriptionName(subscriptionName + 2)
+ .consumerName("bbb")
+ .subscribe();
+ log.info("Consumer2 created. topic: {}", consumer2.getTopic());
+
+ @Cleanup
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+ log.info("Producer1 created. topic: {}", producer1.getTopic());
+
+ @Cleanup
+ Producer<byte[]> producer2 = pulsarClient.newProducer()
+ .topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
+ .enableBatching(false)
+ .create();
+ log.info("Producer2 created. topic: {}", producer2.getTopic());
+
+ final int numMessages = 10;
+ for (int i = 0; i < numMessages; i++) {
+ producer1.newMessage()
+ .value(("one-partitioned-topic-value-producer1-" +
i).getBytes(UTF_8))
+ .send();
+
+ producer2.newMessage()
+ .value(("one-partitioned-topic-value-producer2-" +
i).getBytes(UTF_8))
+ .send();
+ }
+
+ for (int i = 0; i < numMessages * 2; i++) {
+ Message<byte[]> msg = consumer1.receive(200,
TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ log.info("Consumer1 Received message '{}'.", new
String(msg.getValue(), UTF_8));
+
+ msg = consumer2.receive(200, TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ log.info("Consumer2 Received message '{}'.", new
String(msg.getValue(), UTF_8));
+ }
+
+ assertNull(consumer1.receive(200, TimeUnit.MILLISECONDS));
+ assertNull(consumer2.receive(200, TimeUnit.MILLISECONDS));
+
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3521eb4..9130ecf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -745,7 +745,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
List<CompletableFuture<Consumer<T>>> futureList;
- if (numPartitions > 1) {
+ if (numPartitions > 0) {
this.topics.putIfAbsent(topicName, numPartitions);
allTopicPartitionsNumber.addAndGet(numPartitions);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a3923b7..37eb2a5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -255,7 +255,7 @@ public class PulsarClientImpl implements PulsarClient {
}
ProducerBase<T> producer;
- if (metadata.partitions > 1) {
+ if (metadata.partitions > 0) {
producer = new
PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf,
metadata.partitions,
producerCreatedFuture, schema, interceptors);
} else {
@@ -342,7 +342,7 @@ public class PulsarClientImpl implements PulsarClient {
ConsumerBase<T> consumer;
// gets the next single threaded executor from the list of
executors
ExecutorService listenerThread =
externalExecutorProvider.getExecutor();
- if (metadata.partitions > 1) {
+ if (metadata.partitions > 0) {
consumer =
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
listenerThread, consumerSubscribedFuture,
metadata.partitions, schema, interceptors);
} else {
@@ -469,7 +469,7 @@ public class PulsarClientImpl implements PulsarClient {
log.debug("[{}] Received topic metadata. partitions: {}",
topic, metadata.partitions);
}
- if (metadata.partitions > 1) {
+ if (metadata.partitions > 0) {
readerFuture.completeExceptionally(
new PulsarClientException("Topic reader cannot be
created on a partitioned topic"));
return;
@@ -655,7 +655,7 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic)
{
return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
- if (metadata.partitions > 1) {
+ if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List<String> partitions = new ArrayList<>(metadata.partitions);
for (int i = 0; i < metadata.partitions; i++) {