This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
     new e13bdb3  Fix lookup problem with partions in a non-persistent topics
e13bdb3 is described below

commit e13bdb32d1ddc55e1e1d35e7291d575bef8d4e41
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Feb 19 09:00:00 2018 -0800

    Fix lookup problem with partions in a non-persistent topics
---
 .../pulsar/broker/admin/PersistentTopics.java      |  4 +-
 .../pulsar/client/api/NonPersistentTopicTest.java  | 50 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
index 484a873..7c34142 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/PersistentTopics.java
@@ -1386,8 +1386,8 @@ public class PersistentTopics extends AdminResource {
                 throw ex;
             }
 
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), 
dn.getCluster(),
-                    dn.getNamespacePortion(), "persistent", 
dn.getEncodedLocalName());
+            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
dn.getNamespace(), dn.getDomain().toString(),
+                    dn.getEncodedLocalName());
 
             // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
             // serve/redirect request else fail partitioned-metadata-request 
so, client fails while creating
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index cebce57..021d83e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -186,6 +186,56 @@ public class NonPersistentTopicTest extends 
ProducerConsumerBase {
 
     }
 
+    @Test(dataProvider = "subscriptionType")
+    public void 
testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type) throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final int numPartitions = 5;
+        final String topic = 
"non-persistent://my-property/use/my-ns/partitioned-topic";
+        admin.nonPersistentTopics().createPartitionedTopic(topic, 
numPartitions);
+
+        PulsarClient client = PulsarClient.create("pulsar://localhost:" + 
BROKER_PORT);
+        ConsumerConfiguration consumerConf = new ConsumerConfiguration();
+        consumerConf.setSubscriptionType(type);
+        Consumer consumer = client.subscribe(topic, "subscriber-1", 
consumerConf);
+
+        Producer producer = client.createProducer(topic);
+
+        // Ensure all partitions exist
+        for (int i = 0; i < numPartitions; i++) {
+            DestinationName partition = 
DestinationName.get(topic).getPartition(i);
+            
assertNotNull(pulsar.getBrokerService().getTopicReference(partition.toString()));
+        }
+
+        int totalProduceMsg = 500;
+        for (int i = 0; i < totalProduceMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            Thread.sleep(10);
+        }
+
+        Message msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < totalProduceMsg; i++) {
+            msg = consumer.receive(1, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.acknowledge(msg);
+                String receivedMessage = new String(msg.getData());
+                log.debug("Received message: [{}]", receivedMessage);
+                String expectedMessage = "my-message-" + i;
+                testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+            } else {
+                break;
+            }
+        }
+        assertEquals(messageSet.size(), totalProduceMsg);
+
+        producer.close();
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+        client.close();
+    }
+
     /**
      * It verifies that broker doesn't dispatch messages if consumer runs out 
of permits
      * filled out with messages

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to