This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 4063562 Support get the SchemaInfo from a topic and the schema version (#257) 4063562 is described below commit 4063562688ba703a108916fbed2764a96a175a34 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Fri Apr 28 10:17:37 2023 +0800 Support get the SchemaInfo from a topic and the schema version (#257) ### Motivation Currently there is no public API to get the SchemaInfo from the broker. However, when a consumer tries to decode a message of Avro schema, the writer schema that was stored in the broker is required. The C++ client is not responsible to decode the message, but the extended library like the Python client needs the ability to get the writer schema from the broker. There is a workaround that we can send a REST request to query the schema, but it requires the admin permission. ### Modification - Add the `Client::getSchemaInfoAsync` method to get the schema info asynchronously. When the schema of the given version does not exist, the `SchemaInfo` whose type is `NONE` will be returned. - Add the `Message::getLongSchemaVersion` method to get the schema version of a message. The existing `getSchemaVersion` method is hard to use because it returns a byte array, which users need to know how to decode. - Provide `fromBigEndianBytes` and `toBigEndianBytes` functions to perform conversion between a byte array and the long value of the schema version. Add `Int64SerDes` to test them. - Fix the `LookupServiceTest` that initializes `client_` only once (even with the `GetParam()` method) so the HTTP URL was never tested. - Add `testGetSchemaByVersion` to test `getSchemaInfoAsync`. --- include/pulsar/Client.h | 10 ++++ include/pulsar/Message.h | 9 ++- lib/BinaryProtoLookupService.cc | 19 ++++--- lib/BinaryProtoLookupService.h | 6 +- lib/Client.cc | 10 ++++ lib/ClientConnection.cc | 18 +++--- lib/ClientConnection.h | 6 +- lib/ClientImpl.cc | 12 ++-- lib/Commands.cc | 6 +- lib/Commands.h | 3 +- lib/HTTPLookupService.cc | 12 +++- lib/HTTPLookupService.h | 4 +- lib/Int64SerDes.h | 42 ++++++++++++++ lib/LookupService.h | 7 ++- lib/Message.cc | 5 ++ lib/RetryableLookupService.h | 8 +-- tests/Int64SerDesTest.cc | 46 +++++++++++++++ tests/LookupServiceTest.cc | 121 +++++++++++++++++++++++++++++++++++----- 18 files changed, 283 insertions(+), 61 deletions(-) diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index a8349eb..3514934 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -404,6 +404,16 @@ class PULSAR_PUBLIC Client { */ uint64_t getNumberOfConsumers(); + /** + * Asynchronously get the SchemaInfo of a topic and a specific version. + * + * @topic the topic name + * @version the schema version, see Message::getLongSchemaVersion + * @callback the callback that is triggered when the SchemaInfo is retrieved successfully or not + */ + void getSchemaInfoAsync(const std::string& topic, int64_t version, + std::function<void(Result, const SchemaInfo&)> callback); + private: Client(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, bool poolConnections); diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index 47bc974..fe99131 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -177,7 +177,14 @@ class PULSAR_PUBLIC Message { bool hasSchemaVersion() const; /** - * Get the schema version + * Get the schema version. + * + * @return the the schema version on success or -1 if the message does not have the schema version + */ + int64_t getLongSchemaVersion() const; + + /** + * Get the schema version of the raw bytes. */ const std::string& getSchemaVersion() const; diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index 0b23493..f563f63 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -164,9 +164,9 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac return promise->getFuture(); } -Future<Result, boost::optional<SchemaInfo>> BinaryProtoLookupService::getSchema( - const TopicNamePtr& topicName) { - GetSchemaPromisePtr promise = std::make_shared<Promise<Result, boost::optional<SchemaInfo>>>(); +Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const TopicNamePtr& topicName, + const std::string& version) { + GetSchemaPromisePtr promise = std::make_shared<Promise<Result, SchemaInfo>>(); if (!topicName) { promise->setFailed(ResultInvalidTopicName); @@ -174,13 +174,13 @@ Future<Result, boost::optional<SchemaInfo>> BinaryProtoLookupService::getSchema( } cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost()) .addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(), - std::placeholders::_1, std::placeholders::_2, promise)); + version, std::placeholders::_1, std::placeholders::_2, promise)); return promise->getFuture(); } -void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, Result result, - const ClientConnectionWeakPtr& clientCnx, +void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, const std::string& version, + Result result, const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise) { if (result != ResultOk) { promise->setFailed(result); @@ -189,10 +189,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName ClientConnectionPtr conn = clientCnx.lock(); uint64_t requestId = newRequestId(); - LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName); + LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName + << " version: " << version); - conn->newGetSchema(topicName, requestId) - .addListener([promise](Result result, boost::optional<SchemaInfo> schemaInfo) { + conn->newGetSchema(topicName, version, requestId) + .addListener([promise](Result result, SchemaInfo schemaInfo) { if (result != ResultOk) { promise->setFailed(result); return; diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index f8c91e6..a3c059e 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -34,7 +34,7 @@ class ConnectionPool; class LookupDataResult; class ServiceNameResolver; using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>; -using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, boost::optional<SchemaInfo>>>; +using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: @@ -52,7 +52,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; - Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override; + Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override; protected: // Mark findBroker as protected to make it accessible from test. @@ -80,7 +80,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Result result, const ClientConnectionWeakPtr& clientCnx, NamespaceTopicsPromisePtr promise); - void sendGetSchemaRequest(const std::string& topiName, Result result, + void sendGetSchemaRequest(const std::string& topicName, const std::string& version, Result result, const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise); void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr, diff --git a/lib/Client.cc b/lib/Client.cc index e132189..48c4a67 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -23,7 +23,10 @@ #include <utility> #include "ClientImpl.h" +#include "Int64SerDes.h" #include "LogUtils.h" +#include "LookupService.h" +#include "TopicName.h" #include "Utils.h" DECLARE_LOG_OBJECT() @@ -191,4 +194,11 @@ void Client::shutdown() { impl_->shutdown(); } uint64_t Client::getNumberOfProducers() { return impl_->getNumberOfProducers(); } uint64_t Client::getNumberOfConsumers() { return impl_->getNumberOfConsumers(); } + +void Client::getSchemaInfoAsync(const std::string& topic, int64_t version, + std::function<void(Result, const SchemaInfo&)> callback) { + impl_->getLookup() + ->getSchema(TopicName::get(topic), (version >= 0) ? toBigEndianBytes(version) : "") + .addListener(callback); +} } // namespace pulsar diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 332a695..72b9c8e 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1311,10 +1311,10 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace( return promise.getFuture(); } -Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const std::string& topicName, - uint64_t requestId) { +Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& topicName, + const std::string& version, uint64_t requestId) { Lock lock(mutex_); - Promise<Result, boost::optional<SchemaInfo>> promise; + Promise<Result, SchemaInfo> promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << "Client is not connected to the broker"); @@ -1324,7 +1324,7 @@ Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise)); lock.unlock(); - sendCommand(Commands::newGetSchema(topicName, requestId)); + sendCommand(Commands::newGetSchema(topicName, version, requestId)); return promise.getFuture(); } @@ -1758,21 +1758,19 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp Lock lock(mutex_); auto it = pendingGetSchemaRequests_.find(response.request_id()); if (it != pendingGetSchemaRequests_.end()) { - Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = it->second; + Promise<Result, SchemaInfo> getSchemaPromise = it->second; pendingGetSchemaRequests_.erase(it); lock.unlock(); if (response.has_error_code()) { - if (response.error_code() == proto::TopicNotFound) { - getSchemaPromise.setValue(boost::none); - } else { - Result result = getResult(response.error_code(), response.error_message()); + Result result = getResult(response.error_code(), response.error_message()); + if (response.error_code() != proto::TopicNotFound) { LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server " << result << (response.has_error_message() ? (" (" + response.error_message() + ")") : "") << " -- req_id: " << response.request_id()); - getSchemaPromise.setFailed(result); } + getSchemaPromise.setFailed(result); return; } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 451b0ce..24a43d5 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -185,8 +185,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId); - Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName, - uint64_t requestId); + Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, const std::string& version, + uint64_t requestId); private: struct PendingRequestData { @@ -346,7 +346,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap; PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; - typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> PendingGetSchemaMap; + typedef std::map<long, Promise<Result, SchemaInfo>> PendingGetSchemaMap; PendingGetSchemaMap pendingGetSchemaRequests_; mutable std::mutex mutex_; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index d04289f..a8fc24c 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -166,19 +166,17 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura if (autoDownloadSchema) { auto self = shared_from_this(); - auto confPtr = std::make_shared<ProducerConfiguration>(conf); lookupServicePtr_->getSchema(topicName).addListener( - [self, topicName, confPtr, callback](Result res, boost::optional<SchemaInfo> topicSchema) { + [self, topicName, callback](Result res, SchemaInfo topicSchema) { if (res != ResultOk) { callback(res, Producer()); + return; } - if (topicSchema) { - confPtr->setSchema(topicSchema.get()); - } - + ProducerConfiguration conf; + conf.setSchema(topicSchema); self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, - std::placeholders::_2, topicName, *confPtr, callback)); + std::placeholders::_2, topicName, conf, callback)); }); } else { lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( diff --git a/lib/Commands.cc b/lib/Commands.cc index 8af61e7..cec9d3b 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -161,7 +161,8 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat return buffer; } -SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) { +SharedBuffer Commands::newGetSchema(const std::string& topic, const std::string& version, + uint64_t requestId) { static BaseCommand cmd; static std::mutex mutex; std::lock_guard<std::mutex> lock(mutex); @@ -170,6 +171,9 @@ SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId auto getSchema = cmd.mutable_getschema(); getSchema->set_topic(topic); getSchema->set_request_id(requestId); + if (!version.empty()) { + getSchema->set_schema_version(version); + } const SharedBuffer buffer = writeMessageWithSize(cmd); cmd.clear_getschema(); diff --git a/lib/Commands.h b/lib/Commands.h index 5d24764..65a6406 100644 --- a/lib/Commands.h +++ b/lib/Commands.h @@ -92,7 +92,8 @@ class Commands { static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId, const std::string& listenerName); - static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId); + static SharedBuffer newGetSchema(const std::string& topic, const std::string& version, + uint64_t requestId); static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId, uint64_t sequenceId, ChecksumType checksumType, diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc index 920592d..1956e01 100644 --- a/lib/HTTPLookupService.cc +++ b/lib/HTTPLookupService.cc @@ -25,6 +25,7 @@ #include <boost/property_tree/ptree.hpp> #include "ExecutorService.h" +#include "Int64SerDes.h" #include "LogUtils.h" #include "NamespaceName.h" #include "SchemaUtils.h" @@ -157,8 +158,9 @@ Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync( return promise.getFuture(); } -Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const TopicNamePtr &topicName) { - Promise<Result, boost::optional<SchemaInfo>> promise; +Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr &topicName, + const std::string &version) { + Promise<Result, SchemaInfo> promise; std::stringstream completeUrlStream; const auto &url = serviceNameResolver_.resolveHost(); @@ -171,6 +173,10 @@ Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const T << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName() << "/schema"; } + if (!version.empty()) { + completeUrlStream << "/" << fromBigEndianBytes(version); + } + executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest, shared_from_this(), promise, completeUrlStream.str())); return promise.getFuture(); @@ -450,7 +456,7 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, con Result result = sendHTTPRequest(completeUrl, responseData, responseCode); if (responseCode == 404) { - promise.setValue(boost::none); + promise.setFailed(ResultTopicNotFound); } else if (result != ResultOk) { promise.setFailed(result); } else { diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h index 8dc195c..a1e12fc 100644 --- a/lib/HTTPLookupService.h +++ b/lib/HTTPLookupService.h @@ -28,7 +28,7 @@ namespace pulsar { class ServiceNameResolver; using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>; using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>; -using GetSchemaPromise = Promise<Result, boost::optional<SchemaInfo>>; +using GetSchemaPromise = Promise<Result, SchemaInfo>; class HTTPLookupService : public LookupService, public std::enable_shared_from_this<HTTPLookupService> { class CurlInitializer { @@ -77,7 +77,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr&) override; - Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override; + Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override; Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync( const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override; diff --git a/lib/Int64SerDes.h b/lib/Int64SerDes.h new file mode 100644 index 0000000..dbc5d8a --- /dev/null +++ b/lib/Int64SerDes.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include <stdint.h> + +#include <boost/asio.hpp> // for ntohl + +namespace pulsar { + +inline int64_t fromBigEndianBytes(const std::string& bytes) { + const auto int32Array = reinterpret_cast<const uint32_t*>(bytes.c_str()); + return (static_cast<int64_t>(ntohl(int32Array[0])) << 32) + static_cast<int64_t>(ntohl(int32Array[1])); +} + +inline std::string toBigEndianBytes(int64_t value) { + union { + char bytes[8]; + uint32_t int32Array[2]; + } u; + u.int32Array[0] = htonl(static_cast<int32_t>(value >> 32)); + u.int32Array[1] = htonl(static_cast<int32_t>(value & 0xFFFFFFFF)); + return {u.bytes, 8}; +} + +} // namespace pulsar diff --git a/lib/LookupService.h b/lib/LookupService.h index 84dc37c..7401121 100644 --- a/lib/LookupService.h +++ b/lib/LookupService.h @@ -22,7 +22,6 @@ #include <pulsar/Result.h> #include <pulsar/Schema.h> -#include <boost/optional.hpp> #include <memory> #include <ostream> #include <vector> @@ -77,12 +76,14 @@ class LookupService { const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) = 0; /** - * returns current SchemaInfo {@link SchemaInfo} for a given topic. + * Get the SchemaInfo for a given topic and a specific schema version. * * @param topicName topic-name + * @param version the schema version byte array, if it's empty, use the latest version * @return SchemaInfo */ - virtual Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) = 0; + virtual Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, + const std::string& version = "") = 0; virtual ~LookupService() {} }; diff --git a/lib/Message.cc b/lib/Message.cc index 98364f1..bfd65f4 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -23,6 +23,7 @@ #include <iostream> +#include "Int64SerDes.h" #include "KeyValueImpl.h" #include "MessageImpl.h" #include "PulsarApi.pb.h" @@ -184,6 +185,10 @@ bool Message::hasSchemaVersion() const { return false; } +int64_t Message::getLongSchemaVersion() const { + return (impl_ && impl_->hasSchemaVersion()) ? fromBigEndianBytes(impl_->getSchemaVersion()) : -1L; +} + const std::string& Message::getSchemaVersion() const { if (!impl_) { return emptyString; diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h index 01dd82b..c8fdaab 100644 --- a/lib/RetryableLookupService.h +++ b/lib/RetryableLookupService.h @@ -67,10 +67,10 @@ class RetryableLookupService : public LookupService, [this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); }); } - Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override { - return executeAsync<boost::optional<SchemaInfo>>( - "get-schema" + topicName->toString(), - [this, topicName] { return lookupService_->getSchema(topicName); }); + Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override { + return executeAsync<SchemaInfo>("get-schema" + topicName->toString(), [this, topicName, version] { + return lookupService_->getSchema(topicName, version); + }); } template <typename T> diff --git a/tests/Int64SerDesTest.cc b/tests/Int64SerDesTest.cc new file mode 100644 index 0000000..7dc662a --- /dev/null +++ b/tests/Int64SerDesTest.cc @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <gtest/gtest.h> + +#include "lib/Int64SerDes.h" + +using namespace pulsar; + +TEST(Int64SerDes, testNormal) { + int64_t x = 0x0102030405060708L; + const auto bytes = toBigEndianBytes(x); + ASSERT_EQ(bytes.size(), 8); + for (int i = 0; i < 8; i++) { + ASSERT_EQ(bytes[i], i + 1); + } + int64_t y = fromBigEndianBytes(bytes.data()); + ASSERT_EQ(x, y); +} + +TEST(Int64SerDes, testOverflow) { + int64_t x = 0x8000000000000000L; + const auto bytes = toBigEndianBytes(x); + ASSERT_EQ(bytes.size(), 8); + ASSERT_EQ(static_cast<unsigned char>(bytes[0]), 0x80); + for (int i = 1; i < 8; i++) { + ASSERT_EQ(bytes[i], 0x00); + } + int64_t y = fromBigEndianBytes(bytes); + ASSERT_EQ(x, y); +} diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc index f7ffc69..7712c4e 100644 --- a/tests/LookupServiceTest.cc +++ b/tests/LookupServiceTest.cc @@ -26,6 +26,8 @@ #include <algorithm> #include <boost/exception/all.hpp> +#include <future> +#include <stdexcept> #include "HttpHelper.h" #include "PulsarFriend.h" @@ -42,6 +44,8 @@ DECLARE_LOG_OBJECT() static std::string binaryLookupUrl = "pulsar://localhost:6650"; static std::string httpLookupUrl = "http://localhost:8080"; +extern std::string unique_str(); + TEST(LookupServiceTest, basicLookup) { ExecutorServiceProviderPtr service = std::make_shared<ExecutorServiceProvider>(1); AuthenticationPtr authData = AuthFactory::Disabled(); @@ -222,10 +226,11 @@ TEST(LookupServiceTest, testTimeout) { class LookupServiceTest : public ::testing::TestWithParam<std::string> { public: + void SetUp() override { client_ = Client{GetParam()}; } void TearDown() override { client_.close(); } protected: - Client client_{GetParam()}; + Client client_{httpLookupUrl}; }; TEST_P(LookupServiceTest, basicGetNamespaceTopics) { @@ -289,15 +294,15 @@ TEST_P(LookupServiceTest, testGetSchema) { auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); auto lookup = clientImplPtr->getLookup(); - boost::optional<SchemaInfo> schemaInfo; + SchemaInfo schemaInfo; auto future = lookup->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); - ASSERT_EQ(jsonSchema, schemaInfo->getSchema()); - ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType()); - ASSERT_EQ(properties, schemaInfo->getProperties()); + ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); + ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); + ASSERT_EQ(properties, schemaInfo.getProperties()); } -TEST_P(LookupServiceTest, testGetSchemaNotFund) { +TEST_P(LookupServiceTest, testGetSchemaNotFound) { const std::string topic = "testGetSchemaNotFund" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); @@ -307,10 +312,9 @@ TEST_P(LookupServiceTest, testGetSchemaNotFund) { auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); auto lookup = clientImplPtr->getLookup(); - boost::optional<SchemaInfo> schemaInfo; + SchemaInfo schemaInfo; auto future = lookup->getSchema(TopicName::get(topic)); - ASSERT_EQ(ResultOk, future.get(schemaInfo)); - ASSERT_FALSE(schemaInfo); + ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); } TEST_P(LookupServiceTest, testGetKeyValueSchema) { @@ -333,15 +337,104 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) { auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); auto lookup = clientImplPtr->getLookup(); - boost::optional<SchemaInfo> schemaInfo; + SchemaInfo schemaInfo; auto future = lookup->getSchema(TopicName::get(topic)); ASSERT_EQ(ResultOk, future.get(schemaInfo)); - ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema()); - ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType()); - ASSERT_FALSE(schemaInfo->getProperties().empty()); + ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); + ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType()); + ASSERT_FALSE(schemaInfo.getProperties().empty()); +} + +TEST_P(LookupServiceTest, testGetSchemaByVersion) { + const auto topic = "testGetSchemaByVersion" + unique_str() + GetParam().substr(0, 4); + const std::string schema1 = R"({ + "type": "record", + "name": "User", + "namespace": "test", + "fields": [ + {"name": "name", "type": ["null", "string"]}, + {"name": "age", "type": "int"} + ] +})"; + const std::string schema2 = R"({ + "type": "record", + "name": "User", + "namespace": "test", + "fields": [ + {"name": "age", "type": "int"}, + {"name": "name", "type": ["null", "string"]} + ] +})"; + ProducerConfiguration producerConf1; + producerConf1.setSchema(SchemaInfo{AVRO, "Avro", schema1}); + Producer producer1; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf1, producer1)); + ProducerConfiguration producerConf2; + producerConf2.setSchema(SchemaInfo{AVRO, "Avro", schema2}); + Producer producer2; + ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf2, producer2)); + + // Though these messages are invalid, the C++ client can send them successfully + producer1.send(MessageBuilder().setContent("msg0").build()); + producer2.send(MessageBuilder().setContent("msg1").build()); + + ConsumerConfiguration consumerConf; + consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest); + Consumer consumer; + ASSERT_EQ(ResultOk, client_.subscribe(topic, "sub", consumerConf, consumer)); + + Message msg1; + ASSERT_EQ(ResultOk, consumer.receive(msg1, 3000)); + Message msg2; + ASSERT_EQ(ResultOk, consumer.receive(msg2, 3000)); + + auto getSchemaInfo = [this](const std::string& topic, int64_t version) { + std::promise<SchemaInfo> p; + client_.getSchemaInfoAsync(topic, version, [&p](Result result, const SchemaInfo& info) { + if (result == ResultOk) { + p.set_value(info); + } else { + p.set_exception(std::make_exception_ptr(std::runtime_error(strResult(result)))); + } + }); + return p.get_future().get(); + }; + { + ASSERT_EQ(msg1.getLongSchemaVersion(), 0); + const auto info = getSchemaInfo(topic, 0); + ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); + ASSERT_EQ(info.getSchema(), schema1); + } + { + ASSERT_EQ(msg2.getLongSchemaVersion(), 1); + const auto info = getSchemaInfo(topic, 1); + ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); + ASSERT_EQ(info.getSchema(), schema2); + } + { + const auto info = getSchemaInfo(topic, -1); + ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); + ASSERT_EQ(info.getSchema(), schema2); + } + try { + getSchemaInfo(topic, 2); + FAIL(); + } catch (const std::runtime_error& e) { + ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound)); + } + try { + getSchemaInfo(topic + "-not-exist", 0); + FAIL(); + } catch (const std::runtime_error& e) { + ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound)); + } + + consumer.close(); + producer1.close(); + producer2.close(); } -INSTANTIATE_TEST_CASE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); +INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService { public: