This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 4063562  Support get the SchemaInfo from a topic and the schema 
version (#257)
4063562 is described below

commit 4063562688ba703a108916fbed2764a96a175a34
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Fri Apr 28 10:17:37 2023 +0800

    Support get the SchemaInfo from a topic and the schema version (#257)
    
    ### Motivation
    
    Currently there is no public API to get the SchemaInfo from the broker.
    However, when a consumer tries to decode a message of Avro schema, the
    writer schema that was stored in the broker is required. The C++ client
    is not responsible to decode the message, but the extended library like
    the Python client needs the ability to get the writer schema from the
    broker. There is a workaround that we can send a REST request to query
    the schema, but it requires the admin permission.
    
    ### Modification
    
    - Add the `Client::getSchemaInfoAsync` method to get the schema info
      asynchronously. When the schema of the given version does not exist,
      the `SchemaInfo` whose type is `NONE` will be returned.
    - Add the `Message::getLongSchemaVersion` method to get the schema
      version of a message. The existing `getSchemaVersion` method is hard
      to use because it returns a byte array, which users need to know how
      to decode.
    - Provide `fromBigEndianBytes` and `toBigEndianBytes` functions to
      perform conversion between a byte array and the long value of the
      schema version. Add `Int64SerDes` to test them.
    - Fix the `LookupServiceTest` that initializes `client_` only once (even
      with the `GetParam()` method) so the HTTP URL was never tested.
    - Add `testGetSchemaByVersion` to test `getSchemaInfoAsync`.
---
 include/pulsar/Client.h         |  10 ++++
 include/pulsar/Message.h        |   9 ++-
 lib/BinaryProtoLookupService.cc |  19 ++++---
 lib/BinaryProtoLookupService.h  |   6 +-
 lib/Client.cc                   |  10 ++++
 lib/ClientConnection.cc         |  18 +++---
 lib/ClientConnection.h          |   6 +-
 lib/ClientImpl.cc               |  12 ++--
 lib/Commands.cc                 |   6 +-
 lib/Commands.h                  |   3 +-
 lib/HTTPLookupService.cc        |  12 +++-
 lib/HTTPLookupService.h         |   4 +-
 lib/Int64SerDes.h               |  42 ++++++++++++++
 lib/LookupService.h             |   7 ++-
 lib/Message.cc                  |   5 ++
 lib/RetryableLookupService.h    |   8 +--
 tests/Int64SerDesTest.cc        |  46 +++++++++++++++
 tests/LookupServiceTest.cc      | 121 +++++++++++++++++++++++++++++++++++-----
 18 files changed, 283 insertions(+), 61 deletions(-)

diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index a8349eb..3514934 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -404,6 +404,16 @@ class PULSAR_PUBLIC Client {
      */
     uint64_t getNumberOfConsumers();
 
+    /**
+     * Asynchronously get the SchemaInfo of a topic and a specific version.
+     *
+     * @topic the topic name
+     * @version the schema version, see Message::getLongSchemaVersion
+     * @callback the callback that is triggered when the SchemaInfo is 
retrieved successfully or not
+     */
+    void getSchemaInfoAsync(const std::string& topic, int64_t version,
+                            std::function<void(Result, const SchemaInfo&)> 
callback);
+
    private:
     Client(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration,
            bool poolConnections);
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 47bc974..fe99131 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -177,7 +177,14 @@ class PULSAR_PUBLIC Message {
     bool hasSchemaVersion() const;
 
     /**
-     * Get the schema version
+     * Get the schema version.
+     *
+     * @return the the schema version on success or -1 if the message does not 
have the schema version
+     */
+    int64_t getLongSchemaVersion() const;
+
+    /**
+     * Get the schema version of the raw bytes.
      */
     const std::string& getSchemaVersion() const;
 
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index 0b23493..f563f63 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -164,9 +164,9 @@ Future<Result, NamespaceTopicsPtr> 
BinaryProtoLookupService::getTopicsOfNamespac
     return promise->getFuture();
 }
 
-Future<Result, boost::optional<SchemaInfo>> 
BinaryProtoLookupService::getSchema(
-    const TopicNamePtr& topicName) {
-    GetSchemaPromisePtr promise = std::make_shared<Promise<Result, 
boost::optional<SchemaInfo>>>();
+Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const 
TopicNamePtr& topicName,
+                                                               const 
std::string& version) {
+    GetSchemaPromisePtr promise = std::make_shared<Promise<Result, 
SchemaInfo>>();
 
     if (!topicName) {
         promise->setFailed(ResultInvalidTopicName);
@@ -174,13 +174,13 @@ Future<Result, boost::optional<SchemaInfo>> 
BinaryProtoLookupService::getSchema(
     }
     cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
         
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, 
topicName->toString(),
-                               std::placeholders::_1, std::placeholders::_2, 
promise));
+                               version, std::placeholders::_1, 
std::placeholders::_2, promise));
 
     return promise->getFuture();
 }
 
-void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& 
topicName, Result result,
-                                                    const 
ClientConnectionWeakPtr& clientCnx,
+void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& 
topicName, const std::string& version,
+                                                    Result result, const 
ClientConnectionWeakPtr& clientCnx,
                                                     GetSchemaPromisePtr 
promise) {
     if (result != ResultOk) {
         promise->setFailed(result);
@@ -189,10 +189,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const 
std::string& topicName
 
     ClientConnectionPtr conn = clientCnx.lock();
     uint64_t requestId = newRequestId();
-    LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: 
" << topicName);
+    LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: 
" << topicName
+                                                  << " version: " << version);
 
-    conn->newGetSchema(topicName, requestId)
-        .addListener([promise](Result result, boost::optional<SchemaInfo> 
schemaInfo) {
+    conn->newGetSchema(topicName, version, requestId)
+        .addListener([promise](Result result, SchemaInfo schemaInfo) {
             if (result != ResultOk) {
                 promise->setFailed(result);
                 return;
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index f8c91e6..a3c059e 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -34,7 +34,7 @@ class ConnectionPool;
 class LookupDataResult;
 class ServiceNameResolver;
 using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, 
NamespaceTopicsPtr>>;
-using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, 
boost::optional<SchemaInfo>>>;
+using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
    public:
@@ -52,7 +52,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
         const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override;
 
-    Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override;
+    Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override;
 
    protected:
     // Mark findBroker as protected to make it accessible from test.
@@ -80,7 +80,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
                                          Result result, const 
ClientConnectionWeakPtr& clientCnx,
                                          NamespaceTopicsPromisePtr promise);
 
-    void sendGetSchemaRequest(const std::string& topiName, Result result,
+    void sendGetSchemaRequest(const std::string& topicName, const std::string& 
version, Result result,
                               const ClientConnectionWeakPtr& clientCnx, 
GetSchemaPromisePtr promise);
 
     void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr 
topicsPtr,
diff --git a/lib/Client.cc b/lib/Client.cc
index e132189..48c4a67 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -23,7 +23,10 @@
 #include <utility>
 
 #include "ClientImpl.h"
+#include "Int64SerDes.h"
 #include "LogUtils.h"
+#include "LookupService.h"
+#include "TopicName.h"
 #include "Utils.h"
 
 DECLARE_LOG_OBJECT()
@@ -191,4 +194,11 @@ void Client::shutdown() { impl_->shutdown(); }
 
 uint64_t Client::getNumberOfProducers() { return 
impl_->getNumberOfProducers(); }
 uint64_t Client::getNumberOfConsumers() { return 
impl_->getNumberOfConsumers(); }
+
+void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
+                                std::function<void(Result, const SchemaInfo&)> 
callback) {
+    impl_->getLookup()
+        ->getSchema(TopicName::get(topic), (version >= 0) ? 
toBigEndianBytes(version) : "")
+        .addListener(callback);
+}
 }  // namespace pulsar
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 332a695..72b9c8e 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1311,10 +1311,10 @@ Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(
     return promise.getFuture();
 }
 
-Future<Result, boost::optional<SchemaInfo>> 
ClientConnection::newGetSchema(const std::string& topicName,
-                                                                           
uint64_t requestId) {
+Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& 
topicName,
+                                                          const std::string& 
version, uint64_t requestId) {
     Lock lock(mutex_);
-    Promise<Result, boost::optional<SchemaInfo>> promise;
+    Promise<Result, SchemaInfo> promise;
     if (isClosed()) {
         lock.unlock();
         LOG_ERROR(cnxString_ << "Client is not connected to the broker");
@@ -1324,7 +1324,7 @@ Future<Result, boost::optional<SchemaInfo>> 
ClientConnection::newGetSchema(const
 
     pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
     lock.unlock();
-    sendCommand(Commands::newGetSchema(topicName, requestId));
+    sendCommand(Commands::newGetSchema(topicName, version, requestId));
     return promise.getFuture();
 }
 
@@ -1758,21 +1758,19 @@ void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResp
     Lock lock(mutex_);
     auto it = pendingGetSchemaRequests_.find(response.request_id());
     if (it != pendingGetSchemaRequests_.end()) {
-        Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = 
it->second;
+        Promise<Result, SchemaInfo> getSchemaPromise = it->second;
         pendingGetSchemaRequests_.erase(it);
         lock.unlock();
 
         if (response.has_error_code()) {
-            if (response.error_code() == proto::TopicNotFound) {
-                getSchemaPromise.setValue(boost::none);
-            } else {
-                Result result = getResult(response.error_code(), 
response.error_message());
+            Result result = getResult(response.error_code(), 
response.error_message());
+            if (response.error_code() != proto::TopicNotFound) {
                 LOG_WARN(cnxString_ << "Received error GetSchemaResponse from 
server " << result
                                     << (response.has_error_message() ? (" (" + 
response.error_message() + ")")
                                                                      : "")
                                     << " -- req_id: " << 
response.request_id());
-                getSchemaPromise.setFailed(result);
             }
+            getSchemaPromise.setFailed(result);
             return;
         }
 
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 451b0ce..24a43d5 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -185,8 +185,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
                                                                
CommandGetTopicsOfNamespace_Mode mode,
                                                                uint64_t 
requestId);
 
-    Future<Result, boost::optional<SchemaInfo>> newGetSchema(const 
std::string& topicName,
-                                                             uint64_t 
requestId);
+    Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, 
const std::string& version,
+                                            uint64_t requestId);
 
    private:
     struct PendingRequestData {
@@ -346,7 +346,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> 
PendingGetNamespaceTopicsMap;
     PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
 
-    typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> 
PendingGetSchemaMap;
+    typedef std::map<long, Promise<Result, SchemaInfo>> PendingGetSchemaMap;
     PendingGetSchemaMap pendingGetSchemaRequests_;
 
     mutable std::mutex mutex_;
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index d04289f..a8fc24c 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -166,19 +166,17 @@ void ClientImpl::createProducerAsync(const std::string& 
topic, ProducerConfigura
 
     if (autoDownloadSchema) {
         auto self = shared_from_this();
-        auto confPtr = std::make_shared<ProducerConfiguration>(conf);
         lookupServicePtr_->getSchema(topicName).addListener(
-            [self, topicName, confPtr, callback](Result res, 
boost::optional<SchemaInfo> topicSchema) {
+            [self, topicName, callback](Result res, SchemaInfo topicSchema) {
                 if (res != ResultOk) {
                     callback(res, Producer());
+                    return;
                 }
-                if (topicSchema) {
-                    confPtr->setSchema(topicSchema.get());
-                }
-
+                ProducerConfiguration conf;
+                conf.setSchema(topicSchema);
                 
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
                     std::bind(&ClientImpl::handleCreateProducer, self, 
std::placeholders::_1,
-                              std::placeholders::_2, topicName, *confPtr, 
callback));
+                              std::placeholders::_2, topicName, conf, 
callback));
             });
     } else {
         lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 8af61e7..cec9d3b 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -161,7 +161,8 @@ SharedBuffer Commands::newLookup(const std::string& topic, 
const bool authoritat
     return buffer;
 }
 
-SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t 
requestId) {
+SharedBuffer Commands::newGetSchema(const std::string& topic, const 
std::string& version,
+                                    uint64_t requestId) {
     static BaseCommand cmd;
     static std::mutex mutex;
     std::lock_guard<std::mutex> lock(mutex);
@@ -170,6 +171,9 @@ SharedBuffer Commands::newGetSchema(const std::string& 
topic, uint64_t requestId
     auto getSchema = cmd.mutable_getschema();
     getSchema->set_topic(topic);
     getSchema->set_request_id(requestId);
+    if (!version.empty()) {
+        getSchema->set_schema_version(version);
+    }
 
     const SharedBuffer buffer = writeMessageWithSize(cmd);
     cmd.clear_getschema();
diff --git a/lib/Commands.h b/lib/Commands.h
index 5d24764..65a6406 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -92,7 +92,8 @@ class Commands {
     static SharedBuffer newLookup(const std::string& topic, const bool 
authoritative, uint64_t requestId,
                                   const std::string& listenerName);
 
-    static SharedBuffer newGetSchema(const std::string& topic, uint64_t 
requestId);
+    static SharedBuffer newGetSchema(const std::string& topic, const 
std::string& version,
+                                     uint64_t requestId);
 
     static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& 
cmd, uint64_t producerId,
                                     uint64_t sequenceId, ChecksumType 
checksumType,
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 920592d..1956e01 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -25,6 +25,7 @@
 #include <boost/property_tree/ptree.hpp>
 
 #include "ExecutorService.h"
+#include "Int64SerDes.h"
 #include "LogUtils.h"
 #include "NamespaceName.h"
 #include "SchemaUtils.h"
@@ -157,8 +158,9 @@ Future<Result, NamespaceTopicsPtr> 
HTTPLookupService::getTopicsOfNamespaceAsync(
     return promise.getFuture();
 }
 
-Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const 
TopicNamePtr &topicName) {
-    Promise<Result, boost::optional<SchemaInfo>> promise;
+Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr 
&topicName,
+                                                        const std::string 
&version) {
+    Promise<Result, SchemaInfo> promise;
     std::stringstream completeUrlStream;
 
     const auto &url = serviceNameResolver_.resolveHost();
@@ -171,6 +173,10 @@ Future<Result, boost::optional<SchemaInfo>> 
HTTPLookupService::getSchema(const T
                           << topicName->getCluster() << '/' << 
topicName->getNamespacePortion() << '/'
                           << topicName->getEncodedLocalName() << "/schema";
     }
+    if (!version.empty()) {
+        completeUrlStream << "/" << fromBigEndianBytes(version);
+    }
+
     
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest,
                                                  shared_from_this(), promise, 
completeUrlStream.str()));
     return promise.getFuture();
@@ -450,7 +456,7 @@ void 
HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, con
     Result result = sendHTTPRequest(completeUrl, responseData, responseCode);
 
     if (responseCode == 404) {
-        promise.setValue(boost::none);
+        promise.setFailed(ResultTopicNotFound);
     } else if (result != ResultOk) {
         promise.setFailed(result);
     } else {
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index 8dc195c..a1e12fc 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -28,7 +28,7 @@ namespace pulsar {
 class ServiceNameResolver;
 using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
 using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;
-using GetSchemaPromise = Promise<Result, boost::optional<SchemaInfo>>;
+using GetSchemaPromise = Promise<Result, SchemaInfo>;
 
 class HTTPLookupService : public LookupService, public 
std::enable_shared_from_this<HTTPLookupService> {
     class CurlInitializer {
@@ -77,7 +77,7 @@ class HTTPLookupService : public LookupService, public 
std::enable_shared_from_t
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr&) override;
 
-    Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override;
+    Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override;
 
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
         const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override;
diff --git a/lib/Int64SerDes.h b/lib/Int64SerDes.h
new file mode 100644
index 0000000..dbc5d8a
--- /dev/null
+++ b/lib/Int64SerDes.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <stdint.h>
+
+#include <boost/asio.hpp>  // for ntohl
+
+namespace pulsar {
+
+inline int64_t fromBigEndianBytes(const std::string& bytes) {
+    const auto int32Array = reinterpret_cast<const uint32_t*>(bytes.c_str());
+    return (static_cast<int64_t>(ntohl(int32Array[0])) << 32) + 
static_cast<int64_t>(ntohl(int32Array[1]));
+}
+
+inline std::string toBigEndianBytes(int64_t value) {
+    union {
+        char bytes[8];
+        uint32_t int32Array[2];
+    } u;
+    u.int32Array[0] = htonl(static_cast<int32_t>(value >> 32));
+    u.int32Array[1] = htonl(static_cast<int32_t>(value & 0xFFFFFFFF));
+    return {u.bytes, 8};
+}
+
+}  // namespace pulsar
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 84dc37c..7401121 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -22,7 +22,6 @@
 #include <pulsar/Result.h>
 #include <pulsar/Schema.h>
 
-#include <boost/optional.hpp>
 #include <memory>
 #include <ostream>
 #include <vector>
@@ -77,12 +76,14 @@ class LookupService {
         const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
= 0;
 
     /**
-     * returns current SchemaInfo {@link SchemaInfo} for a given topic.
+     * Get the SchemaInfo for a given topic and a specific schema version.
      *
      * @param topicName topic-name
+     * @param version the schema version byte array, if it's empty, use the 
latest version
      * @return SchemaInfo
      */
-    virtual Future<Result, boost::optional<SchemaInfo>> getSchema(const 
TopicNamePtr& topicName) = 0;
+    virtual Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName,
+                                                 const std::string& version = 
"") = 0;
 
     virtual ~LookupService() {}
 };
diff --git a/lib/Message.cc b/lib/Message.cc
index 98364f1..bfd65f4 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -23,6 +23,7 @@
 
 #include <iostream>
 
+#include "Int64SerDes.h"
 #include "KeyValueImpl.h"
 #include "MessageImpl.h"
 #include "PulsarApi.pb.h"
@@ -184,6 +185,10 @@ bool Message::hasSchemaVersion() const {
     return false;
 }
 
+int64_t Message::getLongSchemaVersion() const {
+    return (impl_ && impl_->hasSchemaVersion()) ? 
fromBigEndianBytes(impl_->getSchemaVersion()) : -1L;
+}
+
 const std::string& Message::getSchemaVersion() const {
     if (!impl_) {
         return emptyString;
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 01dd82b..c8fdaab 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -67,10 +67,10 @@ class RetryableLookupService : public LookupService,
             [this, nsName, mode] { return 
lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
     }
 
-    Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& 
topicName) override {
-        return executeAsync<boost::optional<SchemaInfo>>(
-            "get-schema" + topicName->toString(),
-            [this, topicName] { return lookupService_->getSchema(topicName); 
});
+    Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override {
+        return executeAsync<SchemaInfo>("get-schema" + topicName->toString(), 
[this, topicName, version] {
+            return lookupService_->getSchema(topicName, version);
+        });
     }
 
     template <typename T>
diff --git a/tests/Int64SerDesTest.cc b/tests/Int64SerDesTest.cc
new file mode 100644
index 0000000..7dc662a
--- /dev/null
+++ b/tests/Int64SerDesTest.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+
+#include "lib/Int64SerDes.h"
+
+using namespace pulsar;
+
+TEST(Int64SerDes, testNormal) {
+    int64_t x = 0x0102030405060708L;
+    const auto bytes = toBigEndianBytes(x);
+    ASSERT_EQ(bytes.size(), 8);
+    for (int i = 0; i < 8; i++) {
+        ASSERT_EQ(bytes[i], i + 1);
+    }
+    int64_t y = fromBigEndianBytes(bytes.data());
+    ASSERT_EQ(x, y);
+}
+
+TEST(Int64SerDes, testOverflow) {
+    int64_t x = 0x8000000000000000L;
+    const auto bytes = toBigEndianBytes(x);
+    ASSERT_EQ(bytes.size(), 8);
+    ASSERT_EQ(static_cast<unsigned char>(bytes[0]), 0x80);
+    for (int i = 1; i < 8; i++) {
+        ASSERT_EQ(bytes[i], 0x00);
+    }
+    int64_t y = fromBigEndianBytes(bytes);
+    ASSERT_EQ(x, y);
+}
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index f7ffc69..7712c4e 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -26,6 +26,8 @@
 
 #include <algorithm>
 #include <boost/exception/all.hpp>
+#include <future>
+#include <stdexcept>
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
@@ -42,6 +44,8 @@ DECLARE_LOG_OBJECT()
 static std::string binaryLookupUrl = "pulsar://localhost:6650";
 static std::string httpLookupUrl = "http://localhost:8080";;
 
+extern std::string unique_str();
+
 TEST(LookupServiceTest, basicLookup) {
     ExecutorServiceProviderPtr service = 
std::make_shared<ExecutorServiceProvider>(1);
     AuthenticationPtr authData = AuthFactory::Disabled();
@@ -222,10 +226,11 @@ TEST(LookupServiceTest, testTimeout) {
 
 class LookupServiceTest : public ::testing::TestWithParam<std::string> {
    public:
+    void SetUp() override { client_ = Client{GetParam()}; }
     void TearDown() override { client_.close(); }
 
    protected:
-    Client client_{GetParam()};
+    Client client_{httpLookupUrl};
 };
 
 TEST_P(LookupServiceTest, basicGetNamespaceTopics) {
@@ -289,15 +294,15 @@ TEST_P(LookupServiceTest, testGetSchema) {
     auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
     auto lookup = clientImplPtr->getLookup();
 
-    boost::optional<SchemaInfo> schemaInfo;
+    SchemaInfo schemaInfo;
     auto future = lookup->getSchema(TopicName::get(topic));
     ASSERT_EQ(ResultOk, future.get(schemaInfo));
-    ASSERT_EQ(jsonSchema, schemaInfo->getSchema());
-    ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType());
-    ASSERT_EQ(properties, schemaInfo->getProperties());
+    ASSERT_EQ(jsonSchema, schemaInfo.getSchema());
+    ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType());
+    ASSERT_EQ(properties, schemaInfo.getProperties());
 }
 
-TEST_P(LookupServiceTest, testGetSchemaNotFund) {
+TEST_P(LookupServiceTest, testGetSchemaNotFound) {
     const std::string topic =
         "testGetSchemaNotFund" + std::to_string(time(nullptr)) + 
GetParam().substr(0, 4);
 
@@ -307,10 +312,9 @@ TEST_P(LookupServiceTest, testGetSchemaNotFund) {
     auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
     auto lookup = clientImplPtr->getLookup();
 
-    boost::optional<SchemaInfo> schemaInfo;
+    SchemaInfo schemaInfo;
     auto future = lookup->getSchema(TopicName::get(topic));
-    ASSERT_EQ(ResultOk, future.get(schemaInfo));
-    ASSERT_FALSE(schemaInfo);
+    ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo));
 }
 
 TEST_P(LookupServiceTest, testGetKeyValueSchema) {
@@ -333,15 +337,104 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) {
     auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
     auto lookup = clientImplPtr->getLookup();
 
-    boost::optional<SchemaInfo> schemaInfo;
+    SchemaInfo schemaInfo;
     auto future = lookup->getSchema(TopicName::get(topic));
     ASSERT_EQ(ResultOk, future.get(schemaInfo));
-    ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema());
-    ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType());
-    ASSERT_FALSE(schemaInfo->getProperties().empty());
+    ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema());
+    ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType());
+    ASSERT_FALSE(schemaInfo.getProperties().empty());
+}
+
+TEST_P(LookupServiceTest, testGetSchemaByVersion) {
+    const auto topic = "testGetSchemaByVersion" + unique_str() + 
GetParam().substr(0, 4);
+    const std::string schema1 = R"({
+  "type": "record",
+  "name": "User",
+  "namespace": "test",
+  "fields": [
+    {"name": "name", "type": ["null", "string"]},
+    {"name": "age", "type": "int"}
+  ]
+})";
+    const std::string schema2 = R"({
+  "type": "record",
+  "name": "User",
+  "namespace": "test",
+  "fields": [
+    {"name": "age", "type": "int"},
+    {"name": "name", "type": ["null", "string"]}
+  ]
+})";
+    ProducerConfiguration producerConf1;
+    producerConf1.setSchema(SchemaInfo{AVRO, "Avro", schema1});
+    Producer producer1;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf1, 
producer1));
+    ProducerConfiguration producerConf2;
+    producerConf2.setSchema(SchemaInfo{AVRO, "Avro", schema2});
+    Producer producer2;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf2, 
producer2));
+
+    // Though these messages are invalid, the C++ client can send them 
successfully
+    producer1.send(MessageBuilder().setContent("msg0").build());
+    producer2.send(MessageBuilder().setContent("msg1").build());
+
+    ConsumerConfiguration consumerConf;
+    consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client_.subscribe(topic, "sub", consumerConf, 
consumer));
+
+    Message msg1;
+    ASSERT_EQ(ResultOk, consumer.receive(msg1, 3000));
+    Message msg2;
+    ASSERT_EQ(ResultOk, consumer.receive(msg2, 3000));
+
+    auto getSchemaInfo = [this](const std::string& topic, int64_t version) {
+        std::promise<SchemaInfo> p;
+        client_.getSchemaInfoAsync(topic, version, [&p](Result result, const 
SchemaInfo& info) {
+            if (result == ResultOk) {
+                p.set_value(info);
+            } else {
+                
p.set_exception(std::make_exception_ptr(std::runtime_error(strResult(result))));
+            }
+        });
+        return p.get_future().get();
+    };
+    {
+        ASSERT_EQ(msg1.getLongSchemaVersion(), 0);
+        const auto info = getSchemaInfo(topic, 0);
+        ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO);
+        ASSERT_EQ(info.getSchema(), schema1);
+    }
+    {
+        ASSERT_EQ(msg2.getLongSchemaVersion(), 1);
+        const auto info = getSchemaInfo(topic, 1);
+        ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO);
+        ASSERT_EQ(info.getSchema(), schema2);
+    }
+    {
+        const auto info = getSchemaInfo(topic, -1);
+        ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO);
+        ASSERT_EQ(info.getSchema(), schema2);
+    }
+    try {
+        getSchemaInfo(topic, 2);
+        FAIL();
+    } catch (const std::runtime_error& e) {
+        ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound));
+    }
+    try {
+        getSchemaInfo(topic + "-not-exist", 0);
+        FAIL();
+    } catch (const std::runtime_error& e) {
+        ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound));
+    }
+
+    consumer.close();
+    producer1.close();
+    producer2.close();
 }
 
-INSTANTIATE_TEST_CASE_P(Pulsar, LookupServiceTest, 
::testing::Values(binaryLookupUrl, httpLookupUrl));
+INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, 
::testing::Values(binaryLookupUrl, httpLookupUrl));
 
 class BinaryProtoLookupServiceRedirectTestHelper : public 
BinaryProtoLookupService {
    public:

Reply via email to