This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 128287e  produce/consume with 1 partitioned topic (#4883)
128287e is described below

commit 128287e35eab6c3ceb951074dac711dd1d8e829d
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Aug 6 20:17:20 2019 +0800

    produce/consume with 1 partitioned topic (#4883)
    
    Motivation
    In PR #4764, we allow to create partitioned topic with 1 partition, But in 
Pulsar Client, user still not able to do it.
    This fix try to make sure user could create consumer/producer for 1 
partitioned topic .
    
    Modifications
    - old and new added unit test passed.
---
 .../client/api/SimpleProducerConsumerTest.java     | 80 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  8 +--
 3 files changed, 85 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 901bb5a..35d4c77 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
@@ -26,6 +27,7 @@ import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -3117,4 +3119,82 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         consumer5.close();
         log.info("-- Exiting {} test --", methodName);
     }
+
+    /**
+     * This test verifies Producer and Consumer of PartitionedTopic with 1 
partition works well.
+     *
+     * <pre>
+     * 1. create producer/consumer with both original name and 
PARTITIONED_TOPIC_SUFFIX.
+     * 2. verify producer/consumer could produce/consume messages from same 
underline persistent topic.
+     * </pre>
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testPartitionedTopicWithOnePartition() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String topicName = 
"persistent://my-property/my-ns/one-partitioned-topic";
+        final String subscriptionName = "my-sub-";
+
+        // create partitioned topic
+        admin.topics().createPartitionedTopic(topicName, 1);
+        
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 
1);
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+            .topic(topicName)
+            .subscriptionName(subscriptionName + 1)
+            .consumerName("aaa")
+            .subscribe();
+        log.info("Consumer1 created. topic: {}", consumer1.getTopic());
+
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+            .topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
+            .subscriptionName(subscriptionName + 2)
+            .consumerName("bbb")
+            .subscribe();
+        log.info("Consumer2 created. topic: {}", consumer2.getTopic());
+
+        @Cleanup
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+            .topic(topicName)
+            .enableBatching(false)
+            .create();
+        log.info("Producer1 created. topic: {}", producer1.getTopic());
+
+        @Cleanup
+        Producer<byte[]> producer2 = pulsarClient.newProducer()
+            .topic(topicName + PARTITIONED_TOPIC_SUFFIX + 0)
+            .enableBatching(false)
+            .create();
+        log.info("Producer2 created. topic: {}", producer2.getTopic());
+
+        final int numMessages = 10;
+        for (int i = 0; i < numMessages; i++) {
+            producer1.newMessage()
+                .value(("one-partitioned-topic-value-producer1-" + 
i).getBytes(UTF_8))
+                .send();
+
+            producer2.newMessage()
+                .value(("one-partitioned-topic-value-producer2-" + 
i).getBytes(UTF_8))
+                .send();
+        }
+
+        for (int i = 0; i < numMessages * 2; i++) {
+            Message<byte[]> msg = consumer1.receive(200, 
TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            log.info("Consumer1 Received message '{}'.", new 
String(msg.getValue(), UTF_8));
+
+            msg = consumer2.receive(200, TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            log.info("Consumer2 Received message '{}'.", new 
String(msg.getValue(), UTF_8));
+        }
+
+        assertNull(consumer1.receive(200, TimeUnit.MILLISECONDS));
+        assertNull(consumer2.receive(200, TimeUnit.MILLISECONDS));
+
+        log.info("-- Exiting {} test --", methodName);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3521eb4..9130ecf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -745,7 +745,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         List<CompletableFuture<Consumer<T>>> futureList;
-        if (numPartitions > 1) {
+        if (numPartitions > 0) {
             this.topics.putIfAbsent(topicName, numPartitions);
             allTopicPartitionsNumber.addAndGet(numPartitions);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index a3923b7..37eb2a5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -255,7 +255,7 @@ public class PulsarClientImpl implements PulsarClient {
             }
 
             ProducerBase<T> producer;
-            if (metadata.partitions > 1) {
+            if (metadata.partitions > 0) {
                 producer = new 
PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, 
metadata.partitions,
                         producerCreatedFuture, schema, interceptors);
             } else {
@@ -342,7 +342,7 @@ public class PulsarClientImpl implements PulsarClient {
             ConsumerBase<T> consumer;
             // gets the next single threaded executor from the list of 
executors
             ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
-            if (metadata.partitions > 1) {
+            if (metadata.partitions > 0) {
                 consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
                     listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
             } else {
@@ -469,7 +469,7 @@ public class PulsarClientImpl implements PulsarClient {
                 log.debug("[{}] Received topic metadata. partitions: {}", 
topic, metadata.partitions);
             }
 
-            if (metadata.partitions > 1) {
+            if (metadata.partitions > 0) {
                 readerFuture.completeExceptionally(
                         new PulsarClientException("Topic reader cannot be 
created on a partitioned topic"));
                 return;
@@ -655,7 +655,7 @@ public class PulsarClientImpl implements PulsarClient {
     @Override
     public CompletableFuture<List<String>> getPartitionsForTopic(String topic) 
{
         return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
-            if (metadata.partitions > 1) {
+            if (metadata.partitions > 0) {
                 TopicName topicName = TopicName.get(topic);
                 List<String> partitions = new ArrayList<>(metadata.partitions);
                 for (int i = 0; i < metadata.partitions; i++) {

Reply via email to