This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 96507ce [feat] Support auto download schema when create producer. (#157) 96507ce is described below commit 96507cecdc90f10eca5febc48a0623f72d338615 Author: Baodi Shi <ba...@apache.org> AuthorDate: Thu Jan 19 16:40:35 2023 +0800 [feat] Support auto download schema when create producer. (#157) ### Motivation If a schema exists in a topic, but the producer does not know it. He still wanted to send data to the topic. (The scenario in which the DLQ sends a message: Please refer #139 **[Verifying this change][5]**) ### Modifications - When creating a producer with `autoDownloadSchema` param, try to get the schema of that topic and use it. - Support `getSchema` on `LookupService`(HTTP and Binary). ### Verifying this change - Add `LookupServiceTest` unit test to cover `getSchema` logic. - Add `SchemaTest.testAutoDownloadSchema` unit test to cover creating producer success when the topic has a schema. Co-authored-by: Baodi Shi <bao...@apache.org> --- include/pulsar/Schema.h | 3 +- lib/BinaryProtoLookupService.cc | 37 ++++++++++++++ lib/BinaryProtoLookupService.h | 7 +++ lib/ClientConnection.cc | 63 +++++++++++++++++++++++ lib/ClientConnection.h | 6 +++ lib/ClientImpl.cc | 27 ++++++++-- lib/ClientImpl.h | 6 ++- lib/Commands.cc | 16 ++++++ lib/Commands.h | 2 + lib/HTTPLookupService.cc | 109 ++++++++++++++++++++++++++++++++++++---- lib/HTTPLookupService.h | 6 +++ lib/LookupService.h | 10 ++++ lib/RetryableLookupService.h | 6 +++ lib/Schema.cc | 67 +++++++++++++++--------- lib/SchemaUtils.h | 55 ++++++++++++++++++++ test-conf/standalone-ssl.conf | 3 ++ tests/LookupServiceTest.cc | 82 ++++++++++++++++++++++++++++++ tests/SchemaTest.cc | 39 ++++++++++++-- 18 files changed, 501 insertions(+), 43 deletions(-) diff --git a/include/pulsar/Schema.h b/include/pulsar/Schema.h index ad64c0b..93dac4f 100644 --- a/include/pulsar/Schema.h +++ b/include/pulsar/Schema.h @@ -134,6 +134,8 @@ enum SchemaType // Return string representation of result code PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType); +PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr); + class SchemaInfoImpl; typedef std::map<std::string, std::string> StringMap; @@ -195,7 +197,6 @@ class PULSAR_PUBLIC SchemaInfo { private: typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr; SchemaInfoImplPtr impl_; - static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF; }; } // namespace pulsar diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index b925db4..9ce3369 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -155,6 +155,43 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac return promise->getFuture(); } +Future<Result, boost::optional<SchemaInfo>> BinaryProtoLookupService::getSchema( + const TopicNamePtr& topicName) { + GetSchemaPromisePtr promise = std::make_shared<Promise<Result, boost::optional<SchemaInfo>>>(); + + if (!topicName) { + promise->setFailed(ResultInvalidTopicName); + return promise->getFuture(); + } + cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost()) + .addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(), + std::placeholders::_1, std::placeholders::_2, promise)); + + return promise->getFuture(); +} + +void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, Result result, + const ClientConnectionWeakPtr& clientCnx, + GetSchemaPromisePtr promise) { + if (result != ResultOk) { + promise->setFailed(result); + return; + } + + ClientConnectionPtr conn = clientCnx.lock(); + uint64_t requestId = newRequestId(); + LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName); + + conn->newGetSchema(topicName, requestId) + .addListener([promise](Result result, boost::optional<SchemaInfo> schemaInfo) { + if (result != ResultOk) { + promise->setFailed(result); + return; + } + promise->setValue(schemaInfo); + }); +} + void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result, const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise) { diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index 9adb648..9740fb1 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -20,6 +20,7 @@ #define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_ #include <pulsar/Authentication.h> +#include <pulsar/Schema.h> #include <mutex> @@ -32,6 +33,7 @@ class ConnectionPool; class LookupDataResult; class ServiceNameResolver; using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>; +using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, boost::optional<SchemaInfo>>>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: @@ -45,6 +47,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; + Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override; + private: std::mutex mutex_; uint64_t requestIdGenerator_ = 0; @@ -68,6 +72,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise); + void sendGetSchemaRequest(const std::string& topiName, Result result, + const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise); + void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, NamespaceTopicsPromisePtr promise); diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 260aacd..16373c6 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1308,6 +1308,52 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { break; } + case BaseCommand::GET_SCHEMA_RESPONSE: { + const auto& response = incomingCmd.getschemaresponse(); + LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: " + << response.request_id()); + Lock lock(mutex_); + auto it = pendingGetSchemaRequests_.find(response.request_id()); + if (it != pendingGetSchemaRequests_.end()) { + Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = it->second; + pendingGetSchemaRequests_.erase(it); + lock.unlock(); + + if (response.has_error_code()) { + if (response.error_code() == proto::TopicNotFound) { + getSchemaPromise.setValue(boost::none); + } else { + Result result = getResult(response.error_code(), response.error_message()); + LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server " + << result + << (response.has_error_message() + ? (" (" + response.error_message() + ")") + : "") + << " -- req_id: " << response.request_id()); + getSchemaPromise.setFailed(result); + } + return; + } + + const auto& schema = response.schema(); + const auto& properMap = schema.properties(); + StringMap properties; + for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) { + properties[kv->key()] = kv->value(); + } + SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "", + schema.schema_data(), properties); + getSchemaPromise.setValue(schemaInfo); + } else { + lock.unlock(); + LOG_WARN( + "GetSchemaResponse command - Received unknown request id from " + "server: " + << response.request_id()); + } + break; + } + default: { LOG_WARN(cnxString_ << "Received invalid message from server"); close(); @@ -1708,6 +1754,23 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con return promise.getFuture(); } +Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const std::string& topicName, + uint64_t requestId) { + Lock lock(mutex_); + Promise<Result, boost::optional<SchemaInfo>> promise; + if (isClosed()) { + lock.unlock(); + LOG_ERROR(cnxString_ << "Client is not connected to the broker"); + promise.setFailed(ResultNotConnected); + return promise.getFuture(); + } + + pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise)); + lock.unlock(); + sendCommand(Commands::newGetSchema(topicName, requestId)); + return promise.getFuture(); +} + void ClientConnection::closeSocket() { boost::system::error_code err; if (socket_) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 0f322a8..c77ca78 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -169,6 +169,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId); + Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName, + uint64_t requestId); + private: struct PendingRequestData { Promise<Result, ResponseData> promise; @@ -327,6 +330,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap; PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; + typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> PendingGetSchemaMap; + PendingGetSchemaMap pendingGetSchemaRequests_; + mutable std::mutex mutex_; typedef std::unique_lock<std::mutex> Lock; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index a9c1653..ec9de8a 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -142,7 +142,7 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; } void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf, - CreateProducerCallback callback) { + CreateProducerCallback callback, bool autoDownloadSchema) { if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) { throw std::invalid_argument("Batching and chunking of messages can't be enabled together"); } @@ -159,9 +159,28 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura return; } } - lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, - std::placeholders::_2, topicName, conf, callback)); + + if (autoDownloadSchema) { + auto self = shared_from_this(); + auto confPtr = std::make_shared<ProducerConfiguration>(conf); + lookupServicePtr_->getSchema(topicName).addListener( + [self, topicName, confPtr, callback](Result res, boost::optional<SchemaInfo> topicSchema) { + if (res != ResultOk) { + callback(res, Producer()); + } + if (topicSchema) { + confPtr->setSchema(topicSchema.get()); + } + + self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, + std::placeholders::_2, topicName, *confPtr, callback)); + }); + } else { + lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( + std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, + std::placeholders::_2, topicName, conf, callback)); + } } void ClientImpl::handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata, diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 8a39396..af2559d 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -68,8 +68,12 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> { bool poolConnections); ~ClientImpl(); + /** + * @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema + * that exists for the topic. + */ void createProducerAsync(const std::string& topic, ProducerConfiguration conf, - CreateProducerCallback callback); + CreateProducerCallback callback, bool autoDownloadSchema = false); void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback); diff --git a/lib/Commands.cc b/lib/Commands.cc index db993bd..bfd9a46 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -161,6 +161,21 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat return buffer; } +SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) { + static BaseCommand cmd; + static std::mutex mutex; + std::lock_guard<std::mutex> lock(mutex); + cmd.set_type(BaseCommand::GET_SCHEMA); + + auto getSchema = cmd.mutable_getschema(); + getSchema->set_topic(topic); + getSchema->set_request_id(requestId); + + const SharedBuffer buffer = writeMessageWithSize(cmd); + cmd.clear_getschema(); + return buffer; +} + SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) { static BaseCommand cmd; static std::mutex mutex; @@ -872,5 +887,6 @@ bool Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) { bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return peerVersion >= proto::v13; } bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return peerVersion >= proto::v15; } + } // namespace pulsar /* namespace pulsar */ diff --git a/lib/Commands.h b/lib/Commands.h index e7746d2..7d13e2a 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -91,6 +91,8 @@ class Commands { static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId, const std::string& listenerName); + static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId); + static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId, uint64_t sequenceId, ChecksumType checksumType, const proto::MessageMetadata& metadata, const SharedBuffer& payload); diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 8167b64..ae719a5 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -27,6 +27,7 @@ #include "ExecutorService.h" #include "LogUtils.h" #include "NamespaceName.h" +#include "SchemaUtils.h" #include "ServiceNameResolver.h" #include "TopicName.h" namespace ptree = boost::property_tree; @@ -143,6 +144,25 @@ Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync( return promise.getFuture(); } +Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const TopicNamePtr &topicName) { + Promise<Result, boost::optional<SchemaInfo>> promise; + std::stringstream completeUrlStream; + + const auto &url = serviceNameResolver_.resolveHost(); + if (topicName->isV2Topic()) { + completeUrlStream << url << ADMIN_PATH_V2 << "schemas/" << topicName->getProperty() << '/' + << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() + << "/schema"; + } else { + completeUrlStream << url << ADMIN_PATH_V1 << "schemas/" << topicName->getProperty() << '/' + << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' + << topicName->getEncodedLocalName() << "/schema"; + } + executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest, + shared_from_this(), promise, completeUrlStream.str())); + return promise.getFuture(); +} + static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) { ((std::string *)responseDataPtr)->append((char *)contents, size * nmemb); return size * nmemb; @@ -161,6 +181,12 @@ void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise } Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData) { + long responseCode = -1; + return sendHTTPRequest(completeUrl, responseData, responseCode); +} + +Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData, + long &responseCode) { uint16_t reqCount = 0; Result retResult = ResultOk; while (++reqCount <= MAX_HTTP_REDIRECTS) { @@ -253,9 +279,8 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string & // Make get call to server res = curl_easy_perform(handle); - long response_code = -1; - curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code); - LOG_INFO("Response received for url " << completeUrl << " response_code " << response_code + curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &responseCode); + LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode << " curl res " << res); // Free header list @@ -263,12 +288,9 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string & switch (res) { case CURLE_OK: - long response_code; - curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code); - LOG_INFO("Response received for url " << completeUrl << " code " << response_code); - if (response_code == 200) { + if (responseCode == 200) { retResult = ResultOk; - } else if (needRedirection(response_code)) { + } else if (needRedirection(responseCode)) { char *url = NULL; curl_easy_getinfo(handle, CURLINFO_REDIRECT_URL, &url); LOG_INFO("Response from url " << completeUrl << " to new url " << url); @@ -302,7 +324,7 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string & break; } curl_easy_cleanup(handle); - if (!needRedirection(response_code)) { + if (!needRedirection(responseCode)) { break; } } @@ -409,4 +431,73 @@ void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std } } +void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl) { + std::string responseData; + long responseCode = -1; + Result result = sendHTTPRequest(completeUrl, responseData, responseCode); + + if (responseCode == 404) { + promise.setValue(boost::none); + } else if (result != ResultOk) { + promise.setFailed(result); + } else { + ptree::ptree root; + std::stringstream stream(responseData); + try { + ptree::read_json(stream, root); + } catch (ptree::json_parser_error &e) { + LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() + << "\nInput Json = " << responseData); + promise.setFailed(ResultInvalidMessage); + return; + } + const std::string defaultNotFoundString = "Not found"; + auto schemaTypeStr = root.get<std::string>("type", defaultNotFoundString); + if (schemaTypeStr == defaultNotFoundString) { + LOG_ERROR("malformed json! - type not present" << responseData); + promise.setFailed(ResultInvalidMessage); + return; + } + auto schemaData = root.get<std::string>("data", defaultNotFoundString); + if (schemaData == defaultNotFoundString) { + LOG_ERROR("malformed json! - data not present" << responseData); + promise.setFailed(ResultInvalidMessage); + return; + } + + auto schemaType = enumSchemaType(schemaTypeStr); + if (schemaType == KEY_VALUE) { + ptree::ptree kvRoot; + std::stringstream kvStream(schemaData); + try { + ptree::read_json(kvStream, kvRoot); + } catch (ptree::json_parser_error &e) { + LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() + << "\nInput Json = " << schemaData); + promise.setFailed(ResultInvalidMessage); + return; + } + std::stringstream keyStream; + ptree::write_json(keyStream, kvRoot.get_child("key"), false); + std::stringstream valueStream; + ptree::write_json(valueStream, kvRoot.get_child("value"), false); + auto keyData = keyStream.str(); + auto valueData = valueStream.str(); + // Remove the last line break. + keyData.pop_back(); + valueData.pop_back(); + schemaData = mergeKeyValueSchema(keyData, valueData); + } + + StringMap properties; + auto propertiesTree = root.get_child("properties"); + for (const auto &item : propertiesTree) { + properties[item.first] = item.second.get_value<std::string>(); + } + + SchemaInfo schemaInfo = SchemaInfo(schemaType, "", schemaData, properties); + promise.setValue(schemaInfo); + } +} + } // namespace pulsar diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index 929d7ab..f76d3e6 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -28,6 +28,7 @@ namespace pulsar { class ServiceNameResolver; using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>; using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>; +using GetSchemaPromise = Promise<Result, boost::optional<SchemaInfo>>; class HTTPLookupService : public LookupService, public std::enable_shared_from_this<HTTPLookupService> { class CurlInitializer { @@ -62,9 +63,12 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t void handleLookupHTTPRequest(LookupPromise, const std::string, RequestType); void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, const std::string completeUrl); + void handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl); Result sendHTTPRequest(std::string completeUrl, std::string& responseData); + Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode); + public: HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&); @@ -72,6 +76,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr&) override; + Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override; + Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override; }; } // namespace pulsar diff --git a/lib/LookupService.h b/lib/LookupService.h index 6af290c..f1c5de8 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -20,7 +20,9 @@ #define PULSAR_CPP_LOOKUPSERVICE_H #include <pulsar/Result.h> +#include <pulsar/Schema.h> +#include <boost/optional.hpp> #include <memory> #include <ostream> #include <vector> @@ -72,6 +74,14 @@ class LookupService { */ virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) = 0; + /** + * returns current SchemaInfo {@link SchemaInfo} for a given topic. + * + * @param topicName topic-name + * @return SchemaInfo + */ + virtual Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) = 0; + virtual ~LookupService() {} }; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 7d704ec..f16ff75 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -66,6 +66,12 @@ class RetryableLookupService : public LookupService, [this, nsName] { return lookupService_->getTopicsOfNamespaceAsync(nsName); }); } + Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override { + return executeAsync<boost::optional<SchemaInfo>>( + "get-schema" + topicName->toString(), + [this, topicName] { return lookupService_->getSchema(topicName); }); + } + template <typename T> Future<Result, T> executeAsync(const std::string& key, std::function<Future<Result, T>()> f) { Promise<Result, T> promise; diff --git a/lib/Schema.cc b/lib/Schema.cc index 300c91d..d77822a 100644 --- a/lib/Schema.cc +++ b/lib/Schema.cc @@ -25,7 +25,8 @@ #include <map> #include <memory> -#include "SharedBuffer.h" +#include "SchemaUtils.h" + using boost::property_tree::ptree; using boost::property_tree::read_json; using boost::property_tree::write_json; @@ -40,14 +41,6 @@ PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::KeyValueEncoding namespace pulsar { -static const std::string KEY_SCHEMA_NAME = "key.schema.name"; -static const std::string KEY_SCHEMA_TYPE = "key.schema.type"; -static const std::string KEY_SCHEMA_PROPS = "key.schema.properties"; -static const std::string VALUE_SCHEMA_NAME = "value.schema.name"; -static const std::string VALUE_SCHEMA_TYPE = "value.schema.type"; -static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties"; -static const std::string KV_ENCODING_TYPE = "kv.encoding.type"; - PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) { switch (encodingType) { case KeyValueEncodingType::INLINE: @@ -112,6 +105,44 @@ PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType) { return "UnknownSchemaType"; } +PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr) { + if (schemaTypeStr == "NONE") { + return NONE; + } else if (schemaTypeStr == "STRING") { + return STRING; + } else if (schemaTypeStr == "INT8") { + return INT8; + } else if (schemaTypeStr == "INT16") { + return INT16; + } else if (schemaTypeStr == "INT32") { + return INT32; + } else if (schemaTypeStr == "INT64") { + return INT64; + } else if (schemaTypeStr == "FLOAT") { + return FLOAT; + } else if (schemaTypeStr == "DOUBLE") { + return DOUBLE; + } else if (schemaTypeStr == "BYTES") { + return BYTES; + } else if (schemaTypeStr == "JSON") { + return JSON; + } else if (schemaTypeStr == "PROTOBUF") { + return PROTOBUF; + } else if (schemaTypeStr == "AVRO") { + return AVRO; + } else if (schemaTypeStr == "AUTO_CONSUME") { + return AUTO_CONSUME; + } else if (schemaTypeStr == "AUTO_PUBLISH") { + return AUTO_PUBLISH; + } else if (schemaTypeStr == "KEY_VALUE") { + return KEY_VALUE; + } else if (schemaTypeStr == "PROTOBUF_NATIVE") { + return PROTOBUF_NATIVE; + } else { + throw std::invalid_argument("No match schema type: " + schemaTypeStr); + } +} + class PULSAR_PUBLIC SchemaInfoImpl { public: const std::string name_; @@ -134,18 +165,6 @@ SchemaInfo::SchemaInfo(SchemaType schemaType, const std::string &name, const std SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema, const KeyValueEncodingType &keyValueEncodingType) { - std::string keySchemaStr = keySchema.getSchema(); - std::string valueSchemaStr = valueSchema.getSchema(); - uint32_t keySize = keySchemaStr.size(); - uint32_t valueSize = valueSchemaStr.size(); - - auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize; - SharedBuffer buffer = SharedBuffer::allocate(buffSize); - buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : static_cast<uint32_t>(keySize)); - buffer.write(keySchemaStr.c_str(), static_cast<uint32_t>(keySize)); - buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : static_cast<uint32_t>(valueSize)); - buffer.write(valueSchemaStr.c_str(), static_cast<uint32_t>(valueSize)); - auto writeJson = [](const StringMap &properties) { ptree pt; for (auto &entry : properties) { @@ -167,8 +186,10 @@ SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchem properties.emplace(VALUE_SCHEMA_PROPS, writeJson(valueSchema.getProperties())); properties.emplace(KV_ENCODING_TYPE, strEncodingType(keyValueEncodingType)); - impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue", std::string(buffer.data(), buffSize), - properties); + std::string keySchemaStr = keySchema.getSchema(); + std::string valueSchemaStr = valueSchema.getSchema(); + impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue", + mergeKeyValueSchema(keySchemaStr, valueSchemaStr), properties); } SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; } diff --git a/lib/SchemaUtils.h b/lib/SchemaUtils.h new file mode 100644 index 0000000..a298a03 --- /dev/null +++ b/lib/SchemaUtils.h @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef SCHEMA_UTILS_HPP_ +#define SCHEMA_UTILS_HPP_ + +#include "SharedBuffer.h" + +namespace pulsar { + +static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF; +static const std::string KEY_SCHEMA_NAME = "key.schema.name"; +static const std::string KEY_SCHEMA_TYPE = "key.schema.type"; +static const std::string KEY_SCHEMA_PROPS = "key.schema.properties"; +static const std::string VALUE_SCHEMA_NAME = "value.schema.name"; +static const std::string VALUE_SCHEMA_TYPE = "value.schema.type"; +static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties"; +static const std::string KV_ENCODING_TYPE = "kv.encoding.type"; + +/** + * Merge keySchemaData and valueSchemaData. + * @return + */ +static std::string mergeKeyValueSchema(const std::string& keySchemaData, const std::string& valueSchemaData) { + uint32_t keySize = keySchemaData.size(); + uint32_t valueSize = valueSchemaData.size(); + + auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize; + SharedBuffer buffer = SharedBuffer::allocate(buffSize); + buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : static_cast<uint32_t>(keySize)); + buffer.write(keySchemaData.c_str(), static_cast<uint32_t>(keySize)); + buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : static_cast<uint32_t>(valueSize)); + buffer.write(valueSchemaData.c_str(), static_cast<uint32_t>(valueSize)); + + return std::string(buffer.data(), buffSize); +} + +} // namespace pulsar + +#endif /* SCHEMA_UTILS_HPP_ */ diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf index ee7c16c..1e54360 100644 --- a/test-conf/standalone-ssl.conf +++ b/test-conf/standalone-ssl.conf @@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000 # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true +# Disable schema validation: If a producer doesn’t carry a schema, the producer is allowed to connect to the topic and produce data. +isSchemaValidationEnforced=true + # How often to check for topics that have reached the quota backlogQuotaCheckIntervalInSeconds=60 diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index 5ac4122..e31246f 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -39,6 +39,9 @@ using namespace pulsar; DECLARE_LOG_OBJECT() +static std::string binaryLookupUrl = "pulsar://localhost:6650"; +static std::string httpLookupUrl = "http://localhost:8080"; + TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared<ExecutorServiceProvider>(1); AuthenticationPtr authData = AuthFactory::Disabled(); @@ -274,3 +277,82 @@ TEST(LookupServiceTest, testTimeout) { ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); } + +class LookupServiceTest : public ::testing::TestWithParam<std::string> { + public: + void TearDown() override { client_.close(); } + + protected: + Client client_{GetParam()}; +}; + +TEST_P(LookupServiceTest, testGetSchema) { + const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); + std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + + StringMap properties; + properties.emplace("key1", "value1"); + properties.emplace("key2", "value2"); + + ProducerConfiguration producerConfiguration; + producerConfiguration.setSchema(SchemaInfo(SchemaType::JSON, "json", jsonSchema, properties)); + Producer producer; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); + + auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); + auto lookup = clientImplPtr->getLookup(); + + boost::optional<SchemaInfo> schemaInfo; + auto future = lookup->getSchema(TopicName::get(topic)); + ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_EQ(jsonSchema, schemaInfo->getSchema()); + ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType()); + ASSERT_EQ(properties, schemaInfo->getProperties()); +} + +TEST_P(LookupServiceTest, testGetSchemaNotFund) { + const std::string topic = + "testGetSchemaNotFund" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); + + Producer producer; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producer)); + + auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); + auto lookup = clientImplPtr->getLookup(); + + boost::optional<SchemaInfo> schemaInfo; + auto future = lookup->getSchema(TopicName::get(topic)); + ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_FALSE(schemaInfo); +} + +TEST_P(LookupServiceTest, testGetKeyValueSchema) { + const std::string topic = + "testGetKeyValueSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); + StringMap properties; + properties.emplace("key1", "value1"); + properties.emplace("key2", "value2"); + std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + SchemaInfo keySchema(JSON, "key-json", jsonSchema, properties); + SchemaInfo valueSchema(JSON, "value-json", jsonSchema, properties); + SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE); + + ProducerConfiguration producerConfiguration; + producerConfiguration.setSchema(keyValueSchema); + Producer producer; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); + + auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); + auto lookup = clientImplPtr->getLookup(); + + boost::optional<SchemaInfo> schemaInfo; + auto future = lookup->getSchema(TopicName::get(topic)); + ASSERT_EQ(ResultOk, future.get(schemaInfo)); + ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema()); + ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType()); + ASSERT_FALSE(schemaInfo->getProperties().empty()); +} + +INSTANTIATE_TEST_CASE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); \ No newline at end of file diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc index cf0f8c7..34e224e 100644 --- a/tests/SchemaTest.cc +++ b/tests/SchemaTest.cc @@ -19,12 +19,12 @@ #include <gtest/gtest.h> #include <pulsar/Client.h> +#include "PulsarFriend.h" #include "SharedBuffer.h" using namespace pulsar; static std::string lookupUrl = "pulsar://localhost:6650"; - static const std::string exampleSchema = R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})"; @@ -50,11 +50,10 @@ TEST(SchemaTest, testSchema) { res = client.createProducer("topic-avro", producerConf, producer); ASSERT_EQ(ResultIncompatibleSchema, res); - // Creating producer with no schema on same topic should succeed - // because standalone broker is configured by default to not - // require the schema to be set + // Creating producer with no schema on same topic should failed. + // Because we set broker config isSchemaValidationEnforced=true res = client.createProducer("topic-avro", producer); - ASSERT_EQ(ResultOk, res); + ASSERT_EQ(ResultIncompatibleSchema, res); ConsumerConfiguration consumerConf; Consumer consumer; @@ -154,3 +153,33 @@ TEST(SchemaTest, testValueSchemaIsEmpty) { int valueSchemaSize = buffer.readUnsignedInt(); ASSERT_EQ(valueSchemaSize, -1); } + +TEST(SchemaTest, testAutoDownloadSchema) { + const std::string topic = "testAutoPublicSchema" + std::to_string(time(nullptr)); + std::string jsonSchema = + R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; + SchemaInfo schema(JSON, "test-schema", jsonSchema); + + Client client(lookupUrl); + + ConsumerConfiguration consumerConfiguration; + consumerConfiguration.setSchema(schema); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer)); + + ProducerConfiguration producerConfiguration; + Producer producer; + + auto clientImplPtr = PulsarFriend::getClientImplPtr(client); + + Promise<Result, Producer> promise; + clientImplPtr->createProducerAsync(topic, producerConfiguration, WaitForCallbackValue<Producer>(promise), + true); + ASSERT_EQ(ResultOk, promise.getFuture().get(producer)); + + Message msg = MessageBuilder().setContent("content").build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + + ASSERT_EQ(ResultOk, consumer.receive(msg)); + ASSERT_EQ("content", msg.getDataAsString()); +}