merlimat closed pull request #2043: Fixed lookups for v2 topics in C++ client 
lib with HTTP service URL
URL: https://github.com/apache/incubator-pulsar/pull/2043
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc 
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 30b2af6795..36d11f5954 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -21,8 +21,13 @@
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
-const static std::string V2_PATH = "/lookup/v2/destination/";
-const static std::string PARTITION_PATH = "/admin/persistent/";
+
+const static std::string V1_PATH = "/lookup/v2/destination/";
+const static std::string V2_PATH = "/lookup/v2/topic/";
+
+const static std::string ADMIN_PATH_V1 = "/admin/";
+const static std::string ADMIN_PATH_V2 = "/admin/v2/";
+
 const static int MAX_HTTP_REDIRECTS = 20;
 const static std::string PARTITION_METHOD_NAME = "partitions";
 const static int NUMBER_OF_LOOKUP_THREADS = 1;
@@ -53,9 +58,16 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::lookupAsync(const std::st
     }
 
     std::stringstream completeUrlStream;
-    completeUrlStream << adminUrl_ << V2_PATH << "persistent/" << 
topicName->getProperty() << '/'
-                      << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
-                      << topicName->getEncodedLocalName();
+    if (topicName->isV2Topic()) {
+        completeUrlStream << adminUrl_ << V2_PATH << topicName->getDomain() << 
"/" << topicName->getProperty()
+                          << '/' << topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName();
+    } else {
+        completeUrlStream << adminUrl_ << V1_PATH << topicName->getDomain() << 
"/" << topicName->getProperty()
+                          << '/' << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName();
+    }
+
     
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
                                                    promise, 
completeUrlStream.str(), Lookup));
     return promise.getFuture();
@@ -65,9 +77,18 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync
     const TopicNamePtr &topicName) {
     LookupPromise promise;
     std::stringstream completeUrlStream;
-    completeUrlStream << adminUrl_ << PARTITION_PATH << 
topicName->getProperty() << '/'
-                      << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
-                      << topicName->getEncodedLocalName() << '/' << 
PARTITION_METHOD_NAME;
+
+    if (topicName->isV2Topic()) {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << 
topicName->getDomain() << '/'
+                          << topicName->getProperty() << '/' << 
topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName() << '/' << 
PARTITION_METHOD_NAME;
+    } else {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << 
topicName->getDomain() << '/'
+                          << topicName->getProperty() << '/' << 
topicName->getCluster() << '/'
+                          << topicName->getNamespacePortion() << '/' << 
topicName->getEncodedLocalName()
+                          << '/' << PARTITION_METHOD_NAME;
+    }
+
     
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
                                                    promise, 
completeUrlStream.str(), PartitionMetaData));
     return promise.getFuture();
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index b07abbc998..d276730f2e 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -543,6 +543,33 @@ def test_seek(self):
         self.assertEqual(msg.data(), b'hello-0')
         client.close()
 
+    def test_v2_topics(self):
+        self._v2_topics(self.serviceUrl)
+
+    def test_v2_topics_http(self):
+        self._v2_topics(self.adminUrl)
+
+    def _v2_topics(self, url):
+        client = Client(url)
+        consumer = client.subscribe('my-v2-topic-producer-consumer',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = client.create_producer('my-v2-topic-producer-consumer')
+        producer.send('hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+        consumer.acknowledge(msg)
+
+        try:
+            msg = consumer.receive(100)
+            self.assertTrue(False)  # Should not reach this point
+        except:
+            pass  # Exception is expected
+
+        client.close()
+
     def _check_value_error(self, fun):
         try:
             fun()
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 692e21fe5a..f3a9f1266f 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -260,6 +260,36 @@ TEST(BasicEndToEndTest, testNonPersistentTopic) {
     ASSERT_EQ(ResultOk, result);
 }
 
+TEST(BasicEndToEndTest, testV2TopicProtobuf) {
+    std::string topicName = "testV2TopicProtobuf";
+    Client client(lookupUrl);
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, "my-sub-name", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    producer.close();
+    consumer.close();
+}
+
+TEST(BasicEndToEndTest, testV2TopicHttp) {
+    std::string topicName = "testV2TopicHttp";
+    Client client(adminUrl);
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, "my-sub-name", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    producer.close();
+    consumer.close();
+}
+
 TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions) {
     std::string topicName = 
"persistent://prop/unit/ns1/testSingleClientMultipleSubscriptions";
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to