merlimat closed pull request #1564: Support short topic name in cpp client
URL: https://github.com/apache/incubator-pulsar/pull/1564
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/TopicName.cc 
b/pulsar-client-cpp/lib/TopicName.cc
index 1a79a5d30..0ad131396 100644
--- a/pulsar-client-cpp/lib/TopicName.cc
+++ b/pulsar-client-cpp/lib/TopicName.cc
@@ -52,10 +52,30 @@ TopicName::TopicName() {}
 bool TopicName::init(const std::string& topicName) {
     topicName_ = topicName;
     if (topicName.find("://") == std::string::npos) {
-        LOG_ERROR("Topic name is not valid, domain not present - " << 
topicName);
+        std::string topicNameCopy_ = topicName;
+        std::vector<std::string> pathTokens;
+        boost::algorithm::split(pathTokens, topicNameCopy_, 
boost::algorithm::is_any_of("/"));
+        if (pathTokens.size() == 3) {
+            topicName_ = "persistent://" + pathTokens[0] + "/" + pathTokens[1] 
+ "/" + pathTokens[2];
+        } else if (pathTokens.size() == 1) {
+            topicName_ = "persistent://public/default/" + pathTokens[0];
+        } else {
+            LOG_ERROR(
+                "Topic name is not valid, short topic name should be in the 
format of '<topic>' or "
+                "'<property>/<namespace>/<topic>' - "
+                << topicName);
+            return false;
+        }
+    }
+    isV2Topic_ = parse(topicName_, domain_, property_, cluster_, 
namespacePortion_, localName_);
+    if (isV2Topic_ && !cluster_.empty()) {
+        LOG_ERROR("V2 Topic name is not valid, cluster is not empty - " << 
topicName_ << " : cluster "
+                                                                        << 
cluster_);
+        return false;
+    } else if (!isV2Topic_ && cluster_.empty()) {
+        LOG_ERROR("V1 Topic name is not valid, cluster is empty - " << 
topicName_);
         return false;
     }
-    parse(topicName_, domain_, property_, cluster_, namespacePortion_, 
localName_);
     if (localName_.empty()) {
         LOG_ERROR("Topic name is not valid, topic name is empty - " << 
topicName_);
         return false;
@@ -63,28 +83,45 @@ bool TopicName::init(const std::string& topicName) {
     namespaceName_ = NamespaceName::get(property_, cluster_, 
namespacePortion_);
     return true;
 }
-void TopicName::parse(const std::string& topicName, std::string& domain, 
std::string& property,
+bool TopicName::parse(const std::string& topicName, std::string& domain, 
std::string& property,
                       std::string& cluster, std::string& namespacePortion, 
std::string& localName) {
     std::string topicNameCopy = topicName;
     boost::replace_first(topicNameCopy, "://", "/");
     std::vector<std::string> pathTokens;
     boost::algorithm::split(pathTokens, topicNameCopy, 
boost::algorithm::is_any_of("/"));
-    if (pathTokens.size() < 5) {
+    if (pathTokens.size() < 4) {
         LOG_ERROR("Topic name is not valid, does not have enough parts - " << 
topicName);
-        return;
+        return false;
     }
     domain = pathTokens[0];
-    property = pathTokens[1];
-    cluster = pathTokens[2];
-    namespacePortion = pathTokens[3];
+    size_t numSlashIndexes;
+    bool isV2Topic;
+    if (pathTokens.size() == 4) {
+        // New topic name without cluster name
+        property = pathTokens[1];
+        cluster = "";
+        namespacePortion = pathTokens[2];
+        localName = pathTokens[3];
+        numSlashIndexes = 3;
+        isV2Topic = true;
+    } else {
+        // Legacy topic name that includes cluster name
+        property = pathTokens[1];
+        cluster = pathTokens[2];
+        namespacePortion = pathTokens[3];
+        localName = pathTokens[4];
+        numSlashIndexes = 4;
+        isV2Topic = false;
+    }
     size_t slashIndex = -1;
-    // find four '/', whatever is left is topic local name
-    for (int i = 0; i < 4; i++) {
+    // find `numSlashIndexes` '/', whatever is left is topic local name
+    for (int i = 0; i < numSlashIndexes; i++) {
         slashIndex = topicNameCopy.find('/', slashIndex + 1);
     }
     // get index to next char to '/'
     slashIndex++;
     localName = topicNameCopy.substr(slashIndex, (topicNameCopy.size() - 
slashIndex));
+    return isV2Topic;
 }
 std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) {
     Lock lock(curlHandleMutex);
@@ -104,6 +141,8 @@ std::string TopicName::getEncodedName(const std::string& 
nameBeforeEncoding) {
     return nameAfterEncoding;
 }
 
+bool TopicName::isV2Topic() { return isV2Topic_; }
+
 std::string TopicName::getDomain() { return domain_; }
 
 std::string TopicName::getProperty() { return property_; }
@@ -125,9 +164,15 @@ bool TopicName::validate() {
     if (domain_.compare("persistent") != 0) {
         return false;
     }
-    if (!property_.empty() && !cluster_.empty() && !namespacePortion_.empty() 
&& !localName_.empty()) {
+    // cluster_ can be empty
+    if (!isV2Topic_ && !property_.empty() && !cluster_.empty() && 
!namespacePortion_.empty() &&
+        !localName_.empty()) {
+        // v1 topic format
         return NamedEntity::checkName(property_) && 
NamedEntity::checkName(cluster_) &&
                NamedEntity::checkName(namespacePortion_);
+    } else if (isV2Topic_ && !property_.empty() && !namespacePortion_.empty() 
&& !localName_.empty()) {
+        // v2 topic format
+        return NamedEntity::checkName(property_) && 
NamedEntity::checkName(namespacePortion_);
     } else {
         return false;
     }
@@ -142,7 +187,7 @@ boost::shared_ptr<TopicName> TopicName::get(const 
std::string& topicName) {
     if (ptr->validate()) {
         return ptr;
     } else {
-        LOG_ERROR("Topic name validation Failed");
+        LOG_ERROR("Topic name validation Failed - " << topicName);
         return boost::shared_ptr<TopicName>();
     }
 }
@@ -151,16 +196,25 @@ boost::shared_ptr<TopicName> TopicName::get(const 
std::string& topicName) {
 std::string TopicName::getLookupName() {
     std::stringstream ss;
     std::string seperator("/");
-    ss << domain_ << seperator << property_ << seperator << cluster_ << 
seperator << namespacePortion_
-       << seperator << getEncodedLocalName();
+    if (isV2Topic_ && cluster_.empty()) {
+        ss << domain_ << seperator << property_ << seperator << 
namespacePortion_ << seperator
+           << getEncodedLocalName();
+    } else {
+        ss << domain_ << seperator << property_ << seperator << cluster_ << 
seperator << namespacePortion_
+           << seperator << getEncodedLocalName();
+    }
     return ss.str();
 }
 
 std::string TopicName::toString() {
     std::stringstream ss;
     std::string seperator("/");
-    ss << domain_ << "://" << property_ << seperator << cluster_ << seperator 
<< namespacePortion_
-       << seperator << localName_;
+    if (isV2Topic_ && cluster_.empty()) {
+        ss << domain_ << "://" << property_ << seperator << namespacePortion_ 
<< seperator << localName_;
+    } else {
+        ss << domain_ << "://" << property_ << seperator << cluster_ << 
seperator << namespacePortion_
+           << seperator << localName_;
+    }
     return ss.str();
 }
 
diff --git a/pulsar-client-cpp/lib/TopicName.h 
b/pulsar-client-cpp/lib/TopicName.h
index b8d610c98..91b799436 100644
--- a/pulsar-client-cpp/lib/TopicName.h
+++ b/pulsar-client-cpp/lib/TopicName.h
@@ -38,9 +38,11 @@ class TopicName : public ServiceUnitId {
     std::string cluster_;
     std::string namespacePortion_;
     std::string localName_;
+    bool isV2Topic_;
     boost::shared_ptr<NamespaceName> namespaceName_;
 
    public:
+    bool isV2Topic();
     std::string getLookupName();
     std::string getDomain();
     std::string getProperty();
@@ -58,7 +60,7 @@ class TopicName : public ServiceUnitId {
     static CURL* getCurlHandle();
     static CURL* curl;
     static boost::mutex curlHandleMutex;
-    static void parse(const std::string& topicName, std::string& domain, 
std::string& property,
+    static bool parse(const std::string& topicName, std::string& domain, 
std::string& property,
                       std::string& cluster, std::string& namespacePortion, 
std::string& localName);
     TopicName();
     bool validate();
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index dfbf44ed3..f89096cec 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -240,11 +240,11 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
 TEST(BasicEndToEndTest, testNonExistingTopic) {
     Client client(lookupUrl);
     Producer producer;
-    Result result = client.createProducer("persistent://prop/unit/ns1", 
producer);
+    Result result = 
client.createProducer("persistent://prop//unit/ns1/testNonExistingTopic", 
producer);
     ASSERT_EQ(ResultInvalidTopicName, result);
 
     Consumer consumer;
-    result = client.subscribe("persistent://prop/unit/ns1", "my-sub-name", 
consumer);
+    result = 
client.subscribe("persistent://prop//unit/ns1/testNonExistingTopic", 
"my-sub-name", consumer);
     ASSERT_EQ(ResultInvalidTopicName, result);
 }
 
diff --git a/pulsar-client-cpp/tests/TopicNameTest.cc 
b/pulsar-client-cpp/tests/TopicNameTest.cc
index 373bbef16..5c9410a30 100644
--- a/pulsar-client-cpp/tests/TopicNameTest.cc
+++ b/pulsar-client-cpp/tests/TopicNameTest.cc
@@ -42,6 +42,42 @@ TEST(TopicNameTest, testTopicName) {
     ASSERT_TRUE(*topicName1 == *topicName2);
 }
 
+TEST(TopicNameTest, testShortTopicName) {
+    // "short-topic"
+    boost::shared_ptr<TopicName> tn1 = TopicName::get("short-topic");
+    ASSERT_EQ("public", tn1->getProperty());
+    ASSERT_EQ("", tn1->getCluster());
+    ASSERT_EQ("default", tn1->getNamespacePortion());
+    ASSERT_EQ("persistent", tn1->getDomain());
+    ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn1->getLocalName());
+
+    // tenant/namespace/topic
+    boost::shared_ptr<TopicName> tn2 = 
TopicName::get("tenant/namespace/short-topic");
+    ASSERT_EQ("tenant", tn2->getProperty());
+    ASSERT_EQ("", tn2->getCluster());
+    ASSERT_EQ("namespace", tn2->getNamespacePortion());
+    ASSERT_EQ("persistent", tn2->getDomain());
+    ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn2->getLocalName());
+
+    // tenant/cluster/namespace/topic
+    boost::shared_ptr<TopicName> tn3 = 
TopicName::get("tenant/cluster/namespace/short-topic");
+    ASSERT_FALSE(tn3);
+
+    // tenant/cluster
+    boost::shared_ptr<TopicName> tn4 = TopicName::get("tenant/cluster");
+    ASSERT_FALSE(tn4);
+}
+
+TEST(TopicNameTest, testTopicNameV2) {
+    // v2 topic names doesn't have "cluster"
+    boost::shared_ptr<TopicName> tn1 = 
TopicName::get("persistent://tenant/namespace/short-topic");
+    ASSERT_EQ("tenant", tn1->getProperty());
+    ASSERT_EQ("", tn1->getCluster());
+    ASSERT_EQ("namespace", tn1->getNamespacePortion());
+    ASSERT_EQ("persistent", tn1->getDomain());
+    ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn1->getLocalName());
+}
+
 TEST(TopicNameTest, testTopicNameWithSlashes) {
     // Compare getters and setters
     boost::shared_ptr<TopicName> topicName =


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to