This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 583ddee  Fixed lookups for v2 topics in C++ client lib with HTTP 
service URL (#2043)
583ddee is described below

commit 583ddee954d31721cef647faf504f40a216eedae
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 28 12:24:43 2018 -0700

    Fixed lookups for v2 topics in C++ client lib with HTTP service URL (#2043)
    
    * Fixed lookups for v2 topics in C++ client lib with HTTP service URL
    
    * Fixed formatting
    
    * Fixed path
    
    * Use different topic names in tests
    
    * Fixed formatting
---
 pulsar-client-cpp/lib/HTTPLookupService.cc   | 37 ++++++++++++++++++++++------
 pulsar-client-cpp/python/pulsar_test.py      | 27 ++++++++++++++++++++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc | 30 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 8 deletions(-)

diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc 
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 30b2af6..36d11f5 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -21,8 +21,13 @@
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
-const static std::string V2_PATH = "/lookup/v2/destination/";
-const static std::string PARTITION_PATH = "/admin/persistent/";
+
+const static std::string V1_PATH = "/lookup/v2/destination/";
+const static std::string V2_PATH = "/lookup/v2/topic/";
+
+const static std::string ADMIN_PATH_V1 = "/admin/";
+const static std::string ADMIN_PATH_V2 = "/admin/v2/";
+
 const static int MAX_HTTP_REDIRECTS = 20;
 const static std::string PARTITION_METHOD_NAME = "partitions";
 const static int NUMBER_OF_LOOKUP_THREADS = 1;
@@ -53,9 +58,16 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::lookupAsync(const std::st
     }
 
     std::stringstream completeUrlStream;
-    completeUrlStream << adminUrl_ << V2_PATH << "persistent/" << 
topicName->getProperty() << '/'
-                      << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
-                      << topicName->getEncodedLocalName();
+    if (topicName->isV2Topic()) {
+        completeUrlStream << adminUrl_ << V2_PATH << topicName->getDomain() << 
"/" << topicName->getProperty()
+                          << '/' << topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName();
+    } else {
+        completeUrlStream << adminUrl_ << V1_PATH << topicName->getDomain() << 
"/" << topicName->getProperty()
+                          << '/' << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName();
+    }
+
     
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
                                                    promise, 
completeUrlStream.str(), Lookup));
     return promise.getFuture();
@@ -65,9 +77,18 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync
     const TopicNamePtr &topicName) {
     LookupPromise promise;
     std::stringstream completeUrlStream;
-    completeUrlStream << adminUrl_ << PARTITION_PATH << 
topicName->getProperty() << '/'
-                      << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
-                      << topicName->getEncodedLocalName() << '/' << 
PARTITION_METHOD_NAME;
+
+    if (topicName->isV2Topic()) {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << 
topicName->getDomain() << '/'
+                          << topicName->getProperty() << '/' << 
topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName() << '/' << 
PARTITION_METHOD_NAME;
+    } else {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << 
topicName->getDomain() << '/'
+                          << topicName->getProperty() << '/' << 
topicName->getCluster() << '/'
+                          << topicName->getNamespacePortion() << '/' << 
topicName->getEncodedLocalName()
+                          << '/' << PARTITION_METHOD_NAME;
+    }
+
     
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
                                                    promise, 
completeUrlStream.str(), PartitionMetaData));
     return promise.getFuture();
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index b07abbc..d276730 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -543,6 +543,33 @@ class PulsarTest(TestCase):
         self.assertEqual(msg.data(), b'hello-0')
         client.close()
 
+    def test_v2_topics(self):
+        self._v2_topics(self.serviceUrl)
+
+    def test_v2_topics_http(self):
+        self._v2_topics(self.adminUrl)
+
+    def _v2_topics(self, url):
+        client = Client(url)
+        consumer = client.subscribe('my-v2-topic-producer-consumer',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = client.create_producer('my-v2-topic-producer-consumer')
+        producer.send('hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+        consumer.acknowledge(msg)
+
+        try:
+            msg = consumer.receive(100)
+            self.assertTrue(False)  # Should not reach this point
+        except:
+            pass  # Exception is expected
+
+        client.close()
+
     def _check_value_error(self, fun):
         try:
             fun()
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 692e21f..f3a9f12 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -260,6 +260,36 @@ TEST(BasicEndToEndTest, testNonPersistentTopic) {
     ASSERT_EQ(ResultOk, result);
 }
 
+TEST(BasicEndToEndTest, testV2TopicProtobuf) {
+    std::string topicName = "testV2TopicProtobuf";
+    Client client(lookupUrl);
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, "my-sub-name", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    producer.close();
+    consumer.close();
+}
+
+TEST(BasicEndToEndTest, testV2TopicHttp) {
+    std::string topicName = "testV2TopicHttp";
+    Client client(adminUrl);
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, "my-sub-name", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    producer.close();
+    consumer.close();
+}
+
 TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions) {
     std::string topicName = 
"persistent://prop/unit/ns1/testSingleClientMultipleSubscriptions";
 

Reply via email to