This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 96507ce  [feat] Support auto download schema when create producer. 
(#157)
96507ce is described below

commit 96507cecdc90f10eca5febc48a0623f72d338615
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Thu Jan 19 16:40:35 2023 +0800

    [feat] Support auto download schema when create producer. (#157)
    
    ### Motivation
     If a schema exists in a topic, but the producer does not know it. He still 
wanted to send data to the topic. (The scenario in which the DLQ sends a 
message: Please refer #139 **[Verifying this change][5]**)
    
    
    ### Modifications
    - When creating a producer with `autoDownloadSchema` param, try to get the 
schema of that topic and use it.
    - Support `getSchema` on `LookupService`(HTTP and Binary).
    
    
    ### Verifying this change
    
      - Add `LookupServiceTest` unit test to cover `getSchema` logic.
      - Add `SchemaTest.testAutoDownloadSchema` unit test to cover creating 
producer success when the topic has a schema.
    
    Co-authored-by: Baodi Shi <bao...@apache.org>
---
 include/pulsar/Schema.h         |   3 +-
 lib/BinaryProtoLookupService.cc |  37 ++++++++++++++
 lib/BinaryProtoLookupService.h  |   7 +++
 lib/ClientConnection.cc         |  63 +++++++++++++++++++++++
 lib/ClientConnection.h          |   6 +++
 lib/ClientImpl.cc               |  27 ++++++++--
 lib/ClientImpl.h                |   6 ++-
 lib/Commands.cc                 |  16 ++++++
 lib/Commands.h                  |   2 +
 lib/HTTPLookupService.cc        | 109 ++++++++++++++++++++++++++++++++++++----
 lib/HTTPLookupService.h         |   6 +++
 lib/LookupService.h             |  10 ++++
 lib/RetryableLookupService.h    |   6 +++
 lib/Schema.cc                   |  67 +++++++++++++++---------
 lib/SchemaUtils.h               |  55 ++++++++++++++++++++
 test-conf/standalone-ssl.conf   |   3 ++
 tests/LookupServiceTest.cc      |  82 ++++++++++++++++++++++++++++++
 tests/SchemaTest.cc             |  39 ++++++++++++--
 18 files changed, 501 insertions(+), 43 deletions(-)

diff --git a/include/pulsar/Schema.h b/include/pulsar/Schema.h
index ad64c0b..93dac4f 100644
--- a/include/pulsar/Schema.h
+++ b/include/pulsar/Schema.h
@@ -134,6 +134,8 @@ enum SchemaType
 // Return string representation of result code
 PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType);
 
+PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr);
+
 class SchemaInfoImpl;
 
 typedef std::map<std::string, std::string> StringMap;
@@ -195,7 +197,6 @@ class PULSAR_PUBLIC SchemaInfo {
    private:
     typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
     SchemaInfoImplPtr impl_;
-    static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
 };
 
 }  // namespace pulsar
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index b925db4..9ce3369 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -155,6 +155,43 @@ Future<Result, NamespaceTopicsPtr> 
BinaryProtoLookupService::getTopicsOfNamespac
     return promise->getFuture();
 }
 
+Future<Result, boost::optional<SchemaInfo>> 
BinaryProtoLookupService::getSchema(
+    const TopicNamePtr& topicName) {
+    GetSchemaPromisePtr promise = std::make_shared<Promise<Result, 
boost::optional<SchemaInfo>>>();
+
+    if (!topicName) {
+        promise->setFailed(ResultInvalidTopicName);
+        return promise->getFuture();
+    }
+    cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
+        
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, 
topicName->toString(),
+                               std::placeholders::_1, std::placeholders::_2, 
promise));
+
+    return promise->getFuture();
+}
+
+void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& 
topicName, Result result,
+                                                    const 
ClientConnectionWeakPtr& clientCnx,
+                                                    GetSchemaPromisePtr 
promise) {
+    if (result != ResultOk) {
+        promise->setFailed(result);
+        return;
+    }
+
+    ClientConnectionPtr conn = clientCnx.lock();
+    uint64_t requestId = newRequestId();
+    LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: 
" << topicName);
+
+    conn->newGetSchema(topicName, requestId)
+        .addListener([promise](Result result, boost::optional<SchemaInfo> 
schemaInfo) {
+            if (result != ResultOk) {
+                promise->setFailed(result);
+                return;
+            }
+            promise->setValue(schemaInfo);
+        });
+}
+
 void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const 
std::string& nsName, Result result,
                                                                const 
ClientConnectionWeakPtr& clientCnx,
                                                                
NamespaceTopicsPromisePtr promise) {
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index 9adb648..9740fb1 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -20,6 +20,7 @@
 #define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_
 
 #include <pulsar/Authentication.h>
+#include <pulsar/Schema.h>
 
 #include <mutex>
 
@@ -32,6 +33,7 @@ class ConnectionPool;
 class LookupDataResult;
 class ServiceNameResolver;
 using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, 
NamespaceTopicsPtr>>;
+using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, 
boost::optional<SchemaInfo>>>;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
    public:
@@ -45,6 +47,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
 
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) override;
 
+    Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override;
+
    private:
     std::mutex mutex_;
     uint64_t requestIdGenerator_ = 0;
@@ -68,6 +72,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
                                          const ClientConnectionWeakPtr& 
clientCnx,
                                          NamespaceTopicsPromisePtr promise);
 
+    void sendGetSchemaRequest(const std::string& topiName, Result result,
+                              const ClientConnectionWeakPtr& clientCnx, 
GetSchemaPromisePtr promise);
+
     void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr 
topicsPtr,
                                       NamespaceTopicsPromisePtr promise);
 
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 260aacd..16373c6 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1308,6 +1308,52 @@ void 
ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
                     break;
                 }
 
+                case BaseCommand::GET_SCHEMA_RESPONSE: {
+                    const auto& response = incomingCmd.getschemaresponse();
+                    LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from 
server. req_id: "
+                                         << response.request_id());
+                    Lock lock(mutex_);
+                    auto it = 
pendingGetSchemaRequests_.find(response.request_id());
+                    if (it != pendingGetSchemaRequests_.end()) {
+                        Promise<Result, boost::optional<SchemaInfo>> 
getSchemaPromise = it->second;
+                        pendingGetSchemaRequests_.erase(it);
+                        lock.unlock();
+
+                        if (response.has_error_code()) {
+                            if (response.error_code() == proto::TopicNotFound) 
{
+                                getSchemaPromise.setValue(boost::none);
+                            } else {
+                                Result result = 
getResult(response.error_code(), response.error_message());
+                                LOG_WARN(cnxString_ << "Received error 
GetSchemaResponse from server "
+                                                    << result
+                                                    << 
(response.has_error_message()
+                                                            ? (" (" + 
response.error_message() + ")")
+                                                            : "")
+                                                    << " -- req_id: " << 
response.request_id());
+                                getSchemaPromise.setFailed(result);
+                            }
+                            return;
+                        }
+
+                        const auto& schema = response.schema();
+                        const auto& properMap = schema.properties();
+                        StringMap properties;
+                        for (auto kv = properMap.begin(); kv != 
properMap.end(); ++kv) {
+                            properties[kv->key()] = kv->value();
+                        }
+                        SchemaInfo 
schemaInfo(static_cast<SchemaType>(schema.type()), "",
+                                              schema.schema_data(), 
properties);
+                        getSchemaPromise.setValue(schemaInfo);
+                    } else {
+                        lock.unlock();
+                        LOG_WARN(
+                            "GetSchemaResponse command - Received unknown 
request id from "
+                            "server: "
+                            << response.request_id());
+                    }
+                    break;
+                }
+
                 default: {
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
                     close();
@@ -1708,6 +1754,23 @@ Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(con
     return promise.getFuture();
 }
 
+Future<Result, boost::optional<SchemaInfo>> 
ClientConnection::newGetSchema(const std::string& topicName,
+                                                                           
uint64_t requestId) {
+    Lock lock(mutex_);
+    Promise<Result, boost::optional<SchemaInfo>> promise;
+    if (isClosed()) {
+        lock.unlock();
+        LOG_ERROR(cnxString_ << "Client is not connected to the broker");
+        promise.setFailed(ResultNotConnected);
+        return promise.getFuture();
+    }
+
+    pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
+    lock.unlock();
+    sendCommand(Commands::newGetSchema(topicName, requestId));
+    return promise.getFuture();
+}
+
 void ClientConnection::closeSocket() {
     boost::system::error_code err;
     if (socket_) {
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 0f322a8..c77ca78 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -169,6 +169,9 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const 
std::string& nsName, uint64_t requestId);
 
+    Future<Result, boost::optional<SchemaInfo>> newGetSchema(const 
std::string& topicName,
+                                                             uint64_t 
requestId);
+
    private:
     struct PendingRequestData {
         Promise<Result, ResponseData> promise;
@@ -327,6 +330,9 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> 
PendingGetNamespaceTopicsMap;
     PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
 
+    typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> 
PendingGetSchemaMap;
+    PendingGetSchemaMap pendingGetSchemaRequests_;
+
     mutable std::mutex mutex_;
     typedef std::unique_lock<std::mutex> Lock;
 
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index a9c1653..ec9de8a 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -142,7 +142,7 @@ ExecutorServiceProviderPtr 
ClientImpl::getPartitionListenerExecutorProvider() {
 LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
 
 void ClientImpl::createProducerAsync(const std::string& topic, 
ProducerConfiguration conf,
-                                     CreateProducerCallback callback) {
+                                     CreateProducerCallback callback, bool 
autoDownloadSchema) {
     if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
         throw std::invalid_argument("Batching and chunking of messages can't 
be enabled together");
     }
@@ -159,9 +159,28 @@ void ClientImpl::createProducerAsync(const std::string& 
topic, ProducerConfigura
             return;
         }
     }
-    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
-        std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), 
std::placeholders::_1,
-                  std::placeholders::_2, topicName, conf, callback));
+
+    if (autoDownloadSchema) {
+        auto self = shared_from_this();
+        auto confPtr = std::make_shared<ProducerConfiguration>(conf);
+        lookupServicePtr_->getSchema(topicName).addListener(
+            [self, topicName, confPtr, callback](Result res, 
boost::optional<SchemaInfo> topicSchema) {
+                if (res != ResultOk) {
+                    callback(res, Producer());
+                }
+                if (topicSchema) {
+                    confPtr->setSchema(topicSchema.get());
+                }
+
+                
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+                    std::bind(&ClientImpl::handleCreateProducer, self, 
std::placeholders::_1,
+                              std::placeholders::_2, topicName, *confPtr, 
callback));
+            });
+    } else {
+        lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+            std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), 
std::placeholders::_1,
+                      std::placeholders::_2, topicName, conf, callback));
+    }
 }
 
 void ClientImpl::handleCreateProducer(const Result result, const 
LookupDataResultPtr partitionMetadata,
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 8a39396..af2559d 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -68,8 +68,12 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
                bool poolConnections);
     ~ClientImpl();
 
+    /**
+     * @param autoDownloadSchema When it is true, Before creating a producer, 
it will try to get the schema
+     * that exists for the topic.
+     */
     void createProducerAsync(const std::string& topic, ProducerConfiguration 
conf,
-                             CreateProducerCallback callback);
+                             CreateProducerCallback callback, bool 
autoDownloadSchema = false);
 
     void subscribeAsync(const std::string& topic, const std::string& 
subscriptionName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index db993bd..bfd9a46 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -161,6 +161,21 @@ SharedBuffer Commands::newLookup(const std::string& topic, 
const bool authoritat
     return buffer;
 }
 
+SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t 
requestId) {
+    static BaseCommand cmd;
+    static std::mutex mutex;
+    std::lock_guard<std::mutex> lock(mutex);
+    cmd.set_type(BaseCommand::GET_SCHEMA);
+
+    auto getSchema = cmd.mutable_getschema();
+    getSchema->set_topic(topic);
+    getSchema->set_request_id(requestId);
+
+    const SharedBuffer buffer = writeMessageWithSize(cmd);
+    cmd.clear_getschema();
+    return buffer;
+}
+
 SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t 
requestId) {
     static BaseCommand cmd;
     static std::mutex mutex;
@@ -872,5 +887,6 @@ bool 
Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) {
 bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return 
peerVersion >= proto::v13; }
 
 bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return 
peerVersion >= proto::v15; }
+
 }  // namespace pulsar
 /* namespace pulsar */
diff --git a/lib/Commands.h b/lib/Commands.h
index e7746d2..7d13e2a 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -91,6 +91,8 @@ class Commands {
     static SharedBuffer newLookup(const std::string& topic, const bool 
authoritative, uint64_t requestId,
                                   const std::string& listenerName);
 
+    static SharedBuffer newGetSchema(const std::string& topic, uint64_t 
requestId);
+
     static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& 
cmd, uint64_t producerId,
                                     uint64_t sequenceId, ChecksumType 
checksumType,
                                     const proto::MessageMetadata& metadata, 
const SharedBuffer& payload);
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 8167b64..ae719a5 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -27,6 +27,7 @@
 #include "ExecutorService.h"
 #include "LogUtils.h"
 #include "NamespaceName.h"
+#include "SchemaUtils.h"
 #include "ServiceNameResolver.h"
 #include "TopicName.h"
 namespace ptree = boost::property_tree;
@@ -143,6 +144,25 @@ Future<Result, NamespaceTopicsPtr> 
HTTPLookupService::getTopicsOfNamespaceAsync(
     return promise.getFuture();
 }
 
+Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const 
TopicNamePtr &topicName) {
+    Promise<Result, boost::optional<SchemaInfo>> promise;
+    std::stringstream completeUrlStream;
+
+    const auto &url = serviceNameResolver_.resolveHost();
+    if (topicName->isV2Topic()) {
+        completeUrlStream << url << ADMIN_PATH_V2 << "schemas/" << 
topicName->getProperty() << '/'
+                          << topicName->getNamespacePortion() << '/' << 
topicName->getEncodedLocalName()
+                          << "/schema";
+    } else {
+        completeUrlStream << url << ADMIN_PATH_V1 << "schemas/" << 
topicName->getProperty() << '/'
+                          << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
+                          << topicName->getEncodedLocalName() << "/schema";
+    }
+    
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest,
+                                                 shared_from_this(), promise, 
completeUrlStream.str()));
+    return promise.getFuture();
+}
+
 static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, 
void *responseDataPtr) {
     ((std::string *)responseDataPtr)->append((char *)contents, size * nmemb);
     return size * nmemb;
@@ -161,6 +181,12 @@ void 
HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise
 }
 
 Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string 
&responseData) {
+    long responseCode = -1;
+    return sendHTTPRequest(completeUrl, responseData, responseCode);
+}
+
+Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string 
&responseData,
+                                          long &responseCode) {
     uint16_t reqCount = 0;
     Result retResult = ResultOk;
     while (++reqCount <= MAX_HTTP_REDIRECTS) {
@@ -253,9 +279,8 @@ Result HTTPLookupService::sendHTTPRequest(std::string 
completeUrl, std::string &
         // Make get call to server
         res = curl_easy_perform(handle);
 
-        long response_code = -1;
-        curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
-        LOG_INFO("Response received for url " << completeUrl << " 
response_code " << response_code
+        curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &responseCode);
+        LOG_INFO("Response received for url " << completeUrl << " responseCode 
" << responseCode
                                               << " curl res " << res);
 
         // Free header list
@@ -263,12 +288,9 @@ Result HTTPLookupService::sendHTTPRequest(std::string 
completeUrl, std::string &
 
         switch (res) {
             case CURLE_OK:
-                long response_code;
-                curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, 
&response_code);
-                LOG_INFO("Response received for url " << completeUrl << " code 
" << response_code);
-                if (response_code == 200) {
+                if (responseCode == 200) {
                     retResult = ResultOk;
-                } else if (needRedirection(response_code)) {
+                } else if (needRedirection(responseCode)) {
                     char *url = NULL;
                     curl_easy_getinfo(handle, CURLINFO_REDIRECT_URL, &url);
                     LOG_INFO("Response from url " << completeUrl << " to new 
url " << url);
@@ -302,7 +324,7 @@ Result HTTPLookupService::sendHTTPRequest(std::string 
completeUrl, std::string &
                 break;
         }
         curl_easy_cleanup(handle);
-        if (!needRedirection(response_code)) {
+        if (!needRedirection(responseCode)) {
             break;
         }
     }
@@ -409,4 +431,73 @@ void 
HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std
     }
 }
 
+void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, 
const std::string completeUrl) {
+    std::string responseData;
+    long responseCode = -1;
+    Result result = sendHTTPRequest(completeUrl, responseData, responseCode);
+
+    if (responseCode == 404) {
+        promise.setValue(boost::none);
+    } else if (result != ResultOk) {
+        promise.setFailed(result);
+    } else {
+        ptree::ptree root;
+        std::stringstream stream(responseData);
+        try {
+            ptree::read_json(stream, root);
+        } catch (ptree::json_parser_error &e) {
+            LOG_ERROR("Failed to parse json of Partition Metadata: " << 
e.what()
+                                                                     << 
"\nInput Json = " << responseData);
+            promise.setFailed(ResultInvalidMessage);
+            return;
+        }
+        const std::string defaultNotFoundString = "Not found";
+        auto schemaTypeStr = root.get<std::string>("type", 
defaultNotFoundString);
+        if (schemaTypeStr == defaultNotFoundString) {
+            LOG_ERROR("malformed json! - type not present" << responseData);
+            promise.setFailed(ResultInvalidMessage);
+            return;
+        }
+        auto schemaData = root.get<std::string>("data", defaultNotFoundString);
+        if (schemaData == defaultNotFoundString) {
+            LOG_ERROR("malformed json! - data not present" << responseData);
+            promise.setFailed(ResultInvalidMessage);
+            return;
+        }
+
+        auto schemaType = enumSchemaType(schemaTypeStr);
+        if (schemaType == KEY_VALUE) {
+            ptree::ptree kvRoot;
+            std::stringstream kvStream(schemaData);
+            try {
+                ptree::read_json(kvStream, kvRoot);
+            } catch (ptree::json_parser_error &e) {
+                LOG_ERROR("Failed to parse json of Partition Metadata: " << 
e.what()
+                                                                         << 
"\nInput Json = " << schemaData);
+                promise.setFailed(ResultInvalidMessage);
+                return;
+            }
+            std::stringstream keyStream;
+            ptree::write_json(keyStream, kvRoot.get_child("key"), false);
+            std::stringstream valueStream;
+            ptree::write_json(valueStream, kvRoot.get_child("value"), false);
+            auto keyData = keyStream.str();
+            auto valueData = valueStream.str();
+            // Remove the last line break.
+            keyData.pop_back();
+            valueData.pop_back();
+            schemaData = mergeKeyValueSchema(keyData, valueData);
+        }
+
+        StringMap properties;
+        auto propertiesTree = root.get_child("properties");
+        for (const auto &item : propertiesTree) {
+            properties[item.first] = item.second.get_value<std::string>();
+        }
+
+        SchemaInfo schemaInfo = SchemaInfo(schemaType, "", schemaData, 
properties);
+        promise.setValue(schemaInfo);
+    }
+}
+
 }  // namespace pulsar
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index 929d7ab..f76d3e6 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -28,6 +28,7 @@ namespace pulsar {
 class ServiceNameResolver;
 using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
 using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;
+using GetSchemaPromise = Promise<Result, boost::optional<SchemaInfo>>;
 
 class HTTPLookupService : public LookupService, public 
std::enable_shared_from_this<HTTPLookupService> {
     class CurlInitializer {
@@ -62,9 +63,12 @@ class HTTPLookupService : public LookupService, public 
std::enable_shared_from_t
 
     void handleLookupHTTPRequest(LookupPromise, const std::string, 
RequestType);
     void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, 
const std::string completeUrl);
+    void handleGetSchemaHTTPRequest(GetSchemaPromise promise, const 
std::string completeUrl);
 
     Result sendHTTPRequest(std::string completeUrl, std::string& responseData);
 
+    Result sendHTTPRequest(std::string completeUrl, std::string& responseData, 
long& responseCode);
+
    public:
     HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const 
AuthenticationPtr&);
 
@@ -72,6 +76,8 @@ class HTTPLookupService : public LookupService, public 
std::enable_shared_from_t
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr&) override;
 
+    Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override;
+
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) override;
 };
 }  // namespace pulsar
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 6af290c..f1c5de8 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -20,7 +20,9 @@
 #define PULSAR_CPP_LOOKUPSERVICE_H
 
 #include <pulsar/Result.h>
+#include <pulsar/Schema.h>
 
+#include <boost/optional.hpp>
 #include <memory>
 #include <ostream>
 #include <vector>
@@ -72,6 +74,14 @@ class LookupService {
      */
     virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) = 0;
 
+    /**
+     * returns current SchemaInfo {@link SchemaInfo} for a given topic.
+     *
+     * @param topicName topic-name
+     * @return SchemaInfo
+     */
+    virtual Future<Result, boost::optional<SchemaInfo>> getSchema(const 
TopicNamePtr& topicName) = 0;
+
     virtual ~LookupService() {}
 };
 
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 7d704ec..f16ff75 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -66,6 +66,12 @@ class RetryableLookupService : public LookupService,
             [this, nsName] { return 
lookupService_->getTopicsOfNamespaceAsync(nsName); });
     }
 
+    Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override {
+        return executeAsync<boost::optional<SchemaInfo>>(
+            "get-schema" + topicName->toString(),
+            [this, topicName] { return lookupService_->getSchema(topicName); 
});
+    }
+
     template <typename T>
     Future<Result, T> executeAsync(const std::string& key, 
std::function<Future<Result, T>()> f) {
         Promise<Result, T> promise;
diff --git a/lib/Schema.cc b/lib/Schema.cc
index 300c91d..d77822a 100644
--- a/lib/Schema.cc
+++ b/lib/Schema.cc
@@ -25,7 +25,8 @@
 #include <map>
 #include <memory>
 
-#include "SharedBuffer.h"
+#include "SchemaUtils.h"
+
 using boost::property_tree::ptree;
 using boost::property_tree::read_json;
 using boost::property_tree::write_json;
@@ -40,14 +41,6 @@ PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, 
pulsar::KeyValueEncoding
 
 namespace pulsar {
 
-static const std::string KEY_SCHEMA_NAME = "key.schema.name";
-static const std::string KEY_SCHEMA_TYPE = "key.schema.type";
-static const std::string KEY_SCHEMA_PROPS = "key.schema.properties";
-static const std::string VALUE_SCHEMA_NAME = "value.schema.name";
-static const std::string VALUE_SCHEMA_TYPE = "value.schema.type";
-static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties";
-static const std::string KV_ENCODING_TYPE = "kv.encoding.type";
-
 PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) {
     switch (encodingType) {
         case KeyValueEncodingType::INLINE:
@@ -112,6 +105,44 @@ PULSAR_PUBLIC const char *strSchemaType(SchemaType 
schemaType) {
     return "UnknownSchemaType";
 }
 
+PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr) {
+    if (schemaTypeStr == "NONE") {
+        return NONE;
+    } else if (schemaTypeStr == "STRING") {
+        return STRING;
+    } else if (schemaTypeStr == "INT8") {
+        return INT8;
+    } else if (schemaTypeStr == "INT16") {
+        return INT16;
+    } else if (schemaTypeStr == "INT32") {
+        return INT32;
+    } else if (schemaTypeStr == "INT64") {
+        return INT64;
+    } else if (schemaTypeStr == "FLOAT") {
+        return FLOAT;
+    } else if (schemaTypeStr == "DOUBLE") {
+        return DOUBLE;
+    } else if (schemaTypeStr == "BYTES") {
+        return BYTES;
+    } else if (schemaTypeStr == "JSON") {
+        return JSON;
+    } else if (schemaTypeStr == "PROTOBUF") {
+        return PROTOBUF;
+    } else if (schemaTypeStr == "AVRO") {
+        return AVRO;
+    } else if (schemaTypeStr == "AUTO_CONSUME") {
+        return AUTO_CONSUME;
+    } else if (schemaTypeStr == "AUTO_PUBLISH") {
+        return AUTO_PUBLISH;
+    } else if (schemaTypeStr == "KEY_VALUE") {
+        return KEY_VALUE;
+    } else if (schemaTypeStr == "PROTOBUF_NATIVE") {
+        return PROTOBUF_NATIVE;
+    } else {
+        throw std::invalid_argument("No match schema type: " + schemaTypeStr);
+    }
+}
+
 class PULSAR_PUBLIC SchemaInfoImpl {
    public:
     const std::string name_;
@@ -134,18 +165,6 @@ SchemaInfo::SchemaInfo(SchemaType schemaType, const 
std::string &name, const std
 
 SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo 
&valueSchema,
                        const KeyValueEncodingType &keyValueEncodingType) {
-    std::string keySchemaStr = keySchema.getSchema();
-    std::string valueSchemaStr = valueSchema.getSchema();
-    uint32_t keySize = keySchemaStr.size();
-    uint32_t valueSize = valueSchemaStr.size();
-
-    auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize;
-    SharedBuffer buffer = SharedBuffer::allocate(buffSize);
-    buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : 
static_cast<uint32_t>(keySize));
-    buffer.write(keySchemaStr.c_str(), static_cast<uint32_t>(keySize));
-    buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : 
static_cast<uint32_t>(valueSize));
-    buffer.write(valueSchemaStr.c_str(), static_cast<uint32_t>(valueSize));
-
     auto writeJson = [](const StringMap &properties) {
         ptree pt;
         for (auto &entry : properties) {
@@ -167,8 +186,10 @@ SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const 
SchemaInfo &valueSchem
     properties.emplace(VALUE_SCHEMA_PROPS, 
writeJson(valueSchema.getProperties()));
     properties.emplace(KV_ENCODING_TYPE, 
strEncodingType(keyValueEncodingType));
 
-    impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue", 
std::string(buffer.data(), buffSize),
-                                             properties);
+    std::string keySchemaStr = keySchema.getSchema();
+    std::string valueSchemaStr = valueSchema.getSchema();
+    impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue",
+                                             mergeKeyValueSchema(keySchemaStr, 
valueSchemaStr), properties);
 }
 
 SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
diff --git a/lib/SchemaUtils.h b/lib/SchemaUtils.h
new file mode 100644
index 0000000..a298a03
--- /dev/null
+++ b/lib/SchemaUtils.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef SCHEMA_UTILS_HPP_
+#define SCHEMA_UTILS_HPP_
+
+#include "SharedBuffer.h"
+
+namespace pulsar {
+
+static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
+static const std::string KEY_SCHEMA_NAME = "key.schema.name";
+static const std::string KEY_SCHEMA_TYPE = "key.schema.type";
+static const std::string KEY_SCHEMA_PROPS = "key.schema.properties";
+static const std::string VALUE_SCHEMA_NAME = "value.schema.name";
+static const std::string VALUE_SCHEMA_TYPE = "value.schema.type";
+static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties";
+static const std::string KV_ENCODING_TYPE = "kv.encoding.type";
+
+/**
+ * Merge keySchemaData and valueSchemaData.
+ * @return
+ */
+static std::string mergeKeyValueSchema(const std::string& keySchemaData, const 
std::string& valueSchemaData) {
+    uint32_t keySize = keySchemaData.size();
+    uint32_t valueSize = valueSchemaData.size();
+
+    auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize;
+    SharedBuffer buffer = SharedBuffer::allocate(buffSize);
+    buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : 
static_cast<uint32_t>(keySize));
+    buffer.write(keySchemaData.c_str(), static_cast<uint32_t>(keySize));
+    buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : 
static_cast<uint32_t>(valueSize));
+    buffer.write(valueSchemaData.c_str(), static_cast<uint32_t>(valueSize));
+
+    return std::string(buffer.data(), buffSize);
+}
+
+}  // namespace pulsar
+
+#endif /* SCHEMA_UTILS_HPP_ */
diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf
index ee7c16c..1e54360 100644
--- a/test-conf/standalone-ssl.conf
+++ b/test-conf/standalone-ssl.conf
@@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
+# Disable schema validation: If a producer doesn’t carry a schema, the 
producer is allowed to connect to the topic and produce data.
+isSchemaValidationEnforced=true
+
 # How often to check for topics that have reached the quota
 backlogQuotaCheckIntervalInSeconds=60
 
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 5ac4122..e31246f 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -39,6 +39,9 @@ using namespace pulsar;
 
 DECLARE_LOG_OBJECT()
 
+static std::string binaryLookupUrl = "pulsar://localhost:6650";
+static std::string httpLookupUrl = "http://localhost:8080";;
+
 TEST(LookupServiceTest, basicLookup) {
     ExecutorServiceProviderPtr service = 
std::make_shared<ExecutorServiceProvider>(1);
     AuthenticationPtr authData = AuthFactory::Disabled();
@@ -274,3 +277,82 @@ TEST(LookupServiceTest, testTimeout) {
 
     ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
 }
+
+class LookupServiceTest : public ::testing::TestWithParam<std::string> {
+   public:
+    void TearDown() override { client_.close(); }
+
+   protected:
+    Client client_{GetParam()};
+};
+
+TEST_P(LookupServiceTest, testGetSchema) {
+    const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) 
+ GetParam().substr(0, 4);
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+
+    StringMap properties;
+    properties.emplace("key1", "value1");
+    properties.emplace("key2", "value2");
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setSchema(SchemaInfo(SchemaType::JSON, "json", 
jsonSchema, properties));
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, 
producer));
+
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+    auto lookup = clientImplPtr->getLookup();
+
+    boost::optional<SchemaInfo> schemaInfo;
+    auto future = lookup->getSchema(TopicName::get(topic));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_EQ(jsonSchema, schemaInfo->getSchema());
+    ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType());
+    ASSERT_EQ(properties, schemaInfo->getProperties());
+}
+
+TEST_P(LookupServiceTest, testGetSchemaNotFund) {
+    const std::string topic =
+        "testGetSchemaNotFund" + std::to_string(time(nullptr)) + 
GetParam().substr(0, 4);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));
+
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+    auto lookup = clientImplPtr->getLookup();
+
+    boost::optional<SchemaInfo> schemaInfo;
+    auto future = lookup->getSchema(TopicName::get(topic));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_FALSE(schemaInfo);
+}
+
+TEST_P(LookupServiceTest, testGetKeyValueSchema) {
+    const std::string topic =
+        "testGetKeyValueSchema" + std::to_string(time(nullptr)) + 
GetParam().substr(0, 4);
+    StringMap properties;
+    properties.emplace("key1", "value1");
+    properties.emplace("key2", "value2");
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema, properties);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema, properties);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setSchema(keyValueSchema);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, 
producer));
+
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+    auto lookup = clientImplPtr->getLookup();
+
+    boost::optional<SchemaInfo> schemaInfo;
+    auto future = lookup->getSchema(TopicName::get(topic));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema());
+    ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType());
+    ASSERT_FALSE(schemaInfo->getProperties().empty());
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, LookupServiceTest, 
::testing::Values(binaryLookupUrl, httpLookupUrl));
\ No newline at end of file
diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc
index cf0f8c7..34e224e 100644
--- a/tests/SchemaTest.cc
+++ b/tests/SchemaTest.cc
@@ -19,12 +19,12 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include "PulsarFriend.h"
 #include "SharedBuffer.h"
 
 using namespace pulsar;
 
 static std::string lookupUrl = "pulsar://localhost:6650";
-
 static const std::string exampleSchema =
     
R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})";
 
@@ -50,11 +50,10 @@ TEST(SchemaTest, testSchema) {
     res = client.createProducer("topic-avro", producerConf, producer);
     ASSERT_EQ(ResultIncompatibleSchema, res);
 
-    // Creating producer with no schema on same topic should succeed
-    // because standalone broker is configured by default to not
-    // require the schema to be set
+    // Creating producer with no schema on same topic should failed.
+    // Because we set broker config isSchemaValidationEnforced=true
     res = client.createProducer("topic-avro", producer);
-    ASSERT_EQ(ResultOk, res);
+    ASSERT_EQ(ResultIncompatibleSchema, res);
 
     ConsumerConfiguration consumerConf;
     Consumer consumer;
@@ -154,3 +153,33 @@ TEST(SchemaTest, testValueSchemaIsEmpty) {
     int valueSchemaSize = buffer.readUnsignedInt();
     ASSERT_EQ(valueSchemaSize, -1);
 }
+
+TEST(SchemaTest, testAutoDownloadSchema) {
+    const std::string topic = "testAutoPublicSchema" + 
std::to_string(time(nullptr));
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo schema(JSON, "test-schema", jsonSchema);
+
+    Client client(lookupUrl);
+
+    ConsumerConfiguration consumerConfiguration;
+    consumerConfiguration.setSchema(schema);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", 
consumerConfiguration, consumer));
+
+    ProducerConfiguration producerConfiguration;
+    Producer producer;
+
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+    Promise<Result, Producer> promise;
+    clientImplPtr->createProducerAsync(topic, producerConfiguration, 
WaitForCallbackValue<Producer>(promise),
+                                       true);
+    ASSERT_EQ(ResultOk, promise.getFuture().get(producer));
+
+    Message msg = MessageBuilder().setContent("content").build();
+    ASSERT_EQ(ResultOk, producer.send(msg));
+
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+    ASSERT_EQ("content", msg.getDataAsString());
+}


Reply via email to