This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 94484a5 Fixed lookups for v2 topics in C++ client lib with HTTP
service URL (#2043)
94484a5 is described below
commit 94484a5aafe4ad261ff4e5637bfc2d1d2b929ee5
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 28 12:24:43 2018 -0700
Fixed lookups for v2 topics in C++ client lib with HTTP service URL (#2043)
* Fixed lookups for v2 topics in C++ client lib with HTTP service URL
* Fixed formatting
* Fixed path
* Use different topic names in tests
* Fixed formatting
---
pulsar-client-cpp/lib/HTTPLookupService.cc | 37 ++++++++++++++++++++++------
pulsar-client-cpp/python/pulsar_test.py | 27 ++++++++++++++++++++
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 30 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 8 deletions(-)
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 30b2af6..36d11f5 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -21,8 +21,13 @@
DECLARE_LOG_OBJECT()
namespace pulsar {
-const static std::string V2_PATH = "/lookup/v2/destination/";
-const static std::string PARTITION_PATH = "/admin/persistent/";
+
+const static std::string V1_PATH = "/lookup/v2/destination/";
+const static std::string V2_PATH = "/lookup/v2/topic/";
+
+const static std::string ADMIN_PATH_V1 = "/admin/";
+const static std::string ADMIN_PATH_V2 = "/admin/v2/";
+
const static int MAX_HTTP_REDIRECTS = 20;
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;
@@ -53,9 +58,16 @@ Future<Result, LookupDataResultPtr>
HTTPLookupService::lookupAsync(const std::st
}
std::stringstream completeUrlStream;
- completeUrlStream << adminUrl_ << V2_PATH << "persistent/" <<
topicName->getProperty() << '/'
- << topicName->getCluster() << '/' <<
topicName->getNamespacePortion() << '/'
- << topicName->getEncodedLocalName();
+ if (topicName->isV2Topic()) {
+ completeUrlStream << adminUrl_ << V2_PATH << topicName->getDomain() <<
"/" << topicName->getProperty()
+ << '/' << topicName->getNamespacePortion() << '/'
+ << topicName->getEncodedLocalName();
+ } else {
+ completeUrlStream << adminUrl_ << V1_PATH << topicName->getDomain() <<
"/" << topicName->getProperty()
+ << '/' << topicName->getCluster() << '/' <<
topicName->getNamespacePortion() << '/'
+ << topicName->getEncodedLocalName();
+ }
+
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
shared_from_this(),
promise,
completeUrlStream.str(), Lookup));
return promise.getFuture();
@@ -65,9 +77,18 @@ Future<Result, LookupDataResultPtr>
HTTPLookupService::getPartitionMetadataAsync
const TopicNamePtr &topicName) {
LookupPromise promise;
std::stringstream completeUrlStream;
- completeUrlStream << adminUrl_ << PARTITION_PATH <<
topicName->getProperty() << '/'
- << topicName->getCluster() << '/' <<
topicName->getNamespacePortion() << '/'
- << topicName->getEncodedLocalName() << '/' <<
PARTITION_METHOD_NAME;
+
+ if (topicName->isV2Topic()) {
+ completeUrlStream << adminUrl_ << ADMIN_PATH_V2 <<
topicName->getDomain() << '/'
+ << topicName->getProperty() << '/' <<
topicName->getNamespacePortion() << '/'
+ << topicName->getEncodedLocalName() << '/' <<
PARTITION_METHOD_NAME;
+ } else {
+ completeUrlStream << adminUrl_ << ADMIN_PATH_V1 <<
topicName->getDomain() << '/'
+ << topicName->getProperty() << '/' <<
topicName->getCluster() << '/'
+ << topicName->getNamespacePortion() << '/' <<
topicName->getEncodedLocalName()
+ << '/' << PARTITION_METHOD_NAME;
+ }
+
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
shared_from_this(),
promise,
completeUrlStream.str(), PartitionMetaData));
return promise.getFuture();
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index b07abbc..d276730 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -543,6 +543,33 @@ class PulsarTest(TestCase):
self.assertEqual(msg.data(), b'hello-0')
client.close()
+ def test_v2_topics(self):
+ self._v2_topics(self.serviceUrl)
+
+ def test_v2_topics_http(self):
+ self._v2_topics(self.adminUrl)
+
+ def _v2_topics(self, url):
+ client = Client(url)
+ consumer = client.subscribe('my-v2-topic-producer-consumer',
+ 'my-sub',
+ consumer_type=ConsumerType.Shared)
+ producer = client.create_producer('my-v2-topic-producer-consumer')
+ producer.send('hello')
+
+ msg = consumer.receive(1000)
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b'hello')
+ consumer.acknowledge(msg)
+
+ try:
+ msg = consumer.receive(100)
+ self.assertTrue(False) # Should not reach this point
+ except:
+ pass # Exception is expected
+
+ client.close()
+
def _check_value_error(self, fun):
try:
fun()
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 692e21f..f3a9f12 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -260,6 +260,36 @@ TEST(BasicEndToEndTest, testNonPersistentTopic) {
ASSERT_EQ(ResultOk, result);
}
+TEST(BasicEndToEndTest, testV2TopicProtobuf) {
+ std::string topicName = "testV2TopicProtobuf";
+ Client client(lookupUrl);
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, "my-sub-name", consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ producer.close();
+ consumer.close();
+}
+
+TEST(BasicEndToEndTest, testV2TopicHttp) {
+ std::string topicName = "testV2TopicHttp";
+ Client client(adminUrl);
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, "my-sub-name", consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ producer.close();
+ consumer.close();
+}
+
TEST(BasicEndToEndTest, testSingleClientMultipleSubscriptions) {
std::string topicName =
"persistent://prop/unit/ns1/testSingleClientMultipleSubscriptions";