This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 96507ce [feat] Support auto download schema when create producer.
(#157)
96507ce is described below
commit 96507cecdc90f10eca5febc48a0623f72d338615
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jan 19 16:40:35 2023 +0800
[feat] Support auto download schema when create producer. (#157)
### Motivation
If a schema exists in a topic, but the producer does not know it. He still
wanted to send data to the topic. (The scenario in which the DLQ sends a
message: Please refer #139 **[Verifying this change][5]**)
### Modifications
- When creating a producer with `autoDownloadSchema` param, try to get the
schema of that topic and use it.
- Support `getSchema` on `LookupService`(HTTP and Binary).
### Verifying this change
- Add `LookupServiceTest` unit test to cover `getSchema` logic.
- Add `SchemaTest.testAutoDownloadSchema` unit test to cover creating
producer success when the topic has a schema.
Co-authored-by: Baodi Shi <[email protected]>
---
include/pulsar/Schema.h | 3 +-
lib/BinaryProtoLookupService.cc | 37 ++++++++++++++
lib/BinaryProtoLookupService.h | 7 +++
lib/ClientConnection.cc | 63 +++++++++++++++++++++++
lib/ClientConnection.h | 6 +++
lib/ClientImpl.cc | 27 ++++++++--
lib/ClientImpl.h | 6 ++-
lib/Commands.cc | 16 ++++++
lib/Commands.h | 2 +
lib/HTTPLookupService.cc | 109 ++++++++++++++++++++++++++++++++++++----
lib/HTTPLookupService.h | 6 +++
lib/LookupService.h | 10 ++++
lib/RetryableLookupService.h | 6 +++
lib/Schema.cc | 67 +++++++++++++++---------
lib/SchemaUtils.h | 55 ++++++++++++++++++++
test-conf/standalone-ssl.conf | 3 ++
tests/LookupServiceTest.cc | 82 ++++++++++++++++++++++++++++++
tests/SchemaTest.cc | 39 ++++++++++++--
18 files changed, 501 insertions(+), 43 deletions(-)
diff --git a/include/pulsar/Schema.h b/include/pulsar/Schema.h
index ad64c0b..93dac4f 100644
--- a/include/pulsar/Schema.h
+++ b/include/pulsar/Schema.h
@@ -134,6 +134,8 @@ enum SchemaType
// Return string representation of result code
PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType);
+PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr);
+
class SchemaInfoImpl;
typedef std::map<std::string, std::string> StringMap;
@@ -195,7 +197,6 @@ class PULSAR_PUBLIC SchemaInfo {
private:
typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
SchemaInfoImplPtr impl_;
- static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
};
} // namespace pulsar
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index b925db4..9ce3369 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -155,6 +155,43 @@ Future<Result, NamespaceTopicsPtr>
BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}
+Future<Result, boost::optional<SchemaInfo>>
BinaryProtoLookupService::getSchema(
+ const TopicNamePtr& topicName) {
+ GetSchemaPromisePtr promise = std::make_shared<Promise<Result,
boost::optional<SchemaInfo>>>();
+
+ if (!topicName) {
+ promise->setFailed(ResultInvalidTopicName);
+ return promise->getFuture();
+ }
+ cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
+
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this,
topicName->toString(),
+ std::placeholders::_1, std::placeholders::_2,
promise));
+
+ return promise->getFuture();
+}
+
+void BinaryProtoLookupService::sendGetSchemaRequest(const std::string&
topicName, Result result,
+ const
ClientConnectionWeakPtr& clientCnx,
+ GetSchemaPromisePtr
promise) {
+ if (result != ResultOk) {
+ promise->setFailed(result);
+ return;
+ }
+
+ ClientConnectionPtr conn = clientCnx.lock();
+ uint64_t requestId = newRequestId();
+ LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName:
" << topicName);
+
+ conn->newGetSchema(topicName, requestId)
+ .addListener([promise](Result result, boost::optional<SchemaInfo>
schemaInfo) {
+ if (result != ResultOk) {
+ promise->setFailed(result);
+ return;
+ }
+ promise->setValue(schemaInfo);
+ });
+}
+
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const
std::string& nsName, Result result,
const
ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index 9adb648..9740fb1 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -20,6 +20,7 @@
#define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_
#include <pulsar/Authentication.h>
+#include <pulsar/Schema.h>
#include <mutex>
@@ -32,6 +33,7 @@ class ConnectionPool;
class LookupDataResult;
class ServiceNameResolver;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result,
NamespaceTopicsPtr>>;
+using GetSchemaPromisePtr = std::shared_ptr<Promise<Result,
boost::optional<SchemaInfo>>>;
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
@@ -45,6 +47,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public
LookupService {
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) override;
+ Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr&
topicName) override;
+
private:
std::mutex mutex_;
uint64_t requestIdGenerator_ = 0;
@@ -68,6 +72,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public
LookupService {
const ClientConnectionWeakPtr&
clientCnx,
NamespaceTopicsPromisePtr promise);
+ void sendGetSchemaRequest(const std::string& topiName, Result result,
+ const ClientConnectionWeakPtr& clientCnx,
GetSchemaPromisePtr promise);
+
void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr
topicsPtr,
NamespaceTopicsPromisePtr promise);
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 260aacd..16373c6 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1308,6 +1308,52 @@ void
ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
break;
}
+ case BaseCommand::GET_SCHEMA_RESPONSE: {
+ const auto& response = incomingCmd.getschemaresponse();
+ LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from
server. req_id: "
+ << response.request_id());
+ Lock lock(mutex_);
+ auto it =
pendingGetSchemaRequests_.find(response.request_id());
+ if (it != pendingGetSchemaRequests_.end()) {
+ Promise<Result, boost::optional<SchemaInfo>>
getSchemaPromise = it->second;
+ pendingGetSchemaRequests_.erase(it);
+ lock.unlock();
+
+ if (response.has_error_code()) {
+ if (response.error_code() == proto::TopicNotFound)
{
+ getSchemaPromise.setValue(boost::none);
+ } else {
+ Result result =
getResult(response.error_code(), response.error_message());
+ LOG_WARN(cnxString_ << "Received error
GetSchemaResponse from server "
+ << result
+ <<
(response.has_error_message()
+ ? (" (" +
response.error_message() + ")")
+ : "")
+ << " -- req_id: " <<
response.request_id());
+ getSchemaPromise.setFailed(result);
+ }
+ return;
+ }
+
+ const auto& schema = response.schema();
+ const auto& properMap = schema.properties();
+ StringMap properties;
+ for (auto kv = properMap.begin(); kv !=
properMap.end(); ++kv) {
+ properties[kv->key()] = kv->value();
+ }
+ SchemaInfo
schemaInfo(static_cast<SchemaType>(schema.type()), "",
+ schema.schema_data(),
properties);
+ getSchemaPromise.setValue(schemaInfo);
+ } else {
+ lock.unlock();
+ LOG_WARN(
+ "GetSchemaResponse command - Received unknown
request id from "
+ "server: "
+ << response.request_id());
+ }
+ break;
+ }
+
default: {
LOG_WARN(cnxString_ << "Received invalid message from
server");
close();
@@ -1708,6 +1754,23 @@ Future<Result, NamespaceTopicsPtr>
ClientConnection::newGetTopicsOfNamespace(con
return promise.getFuture();
}
+Future<Result, boost::optional<SchemaInfo>>
ClientConnection::newGetSchema(const std::string& topicName,
+
uint64_t requestId) {
+ Lock lock(mutex_);
+ Promise<Result, boost::optional<SchemaInfo>> promise;
+ if (isClosed()) {
+ lock.unlock();
+ LOG_ERROR(cnxString_ << "Client is not connected to the broker");
+ promise.setFailed(ResultNotConnected);
+ return promise.getFuture();
+ }
+
+ pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
+ lock.unlock();
+ sendCommand(Commands::newGetSchema(topicName, requestId));
+ return promise.getFuture();
+}
+
void ClientConnection::closeSocket() {
boost::system::error_code err;
if (socket_) {
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 0f322a8..c77ca78 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -169,6 +169,9 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const
std::string& nsName, uint64_t requestId);
+ Future<Result, boost::optional<SchemaInfo>> newGetSchema(const
std::string& topicName,
+ uint64_t
requestId);
+
private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
@@ -327,6 +330,9 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>>
PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
+ typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>>
PendingGetSchemaMap;
+ PendingGetSchemaMap pendingGetSchemaRequests_;
+
mutable std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index a9c1653..ec9de8a 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -142,7 +142,7 @@ ExecutorServiceProviderPtr
ClientImpl::getPartitionListenerExecutorProvider() {
LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
void ClientImpl::createProducerAsync(const std::string& topic,
ProducerConfiguration conf,
- CreateProducerCallback callback) {
+ CreateProducerCallback callback, bool
autoDownloadSchema) {
if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
throw std::invalid_argument("Batching and chunking of messages can't
be enabled together");
}
@@ -159,9 +159,28 @@ void ClientImpl::createProducerAsync(const std::string&
topic, ProducerConfigura
return;
}
}
- lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
- std::bind(&ClientImpl::handleCreateProducer, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, topicName, conf, callback));
+
+ if (autoDownloadSchema) {
+ auto self = shared_from_this();
+ auto confPtr = std::make_shared<ProducerConfiguration>(conf);
+ lookupServicePtr_->getSchema(topicName).addListener(
+ [self, topicName, confPtr, callback](Result res,
boost::optional<SchemaInfo> topicSchema) {
+ if (res != ResultOk) {
+ callback(res, Producer());
+ }
+ if (topicSchema) {
+ confPtr->setSchema(topicSchema.get());
+ }
+
+
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ std::bind(&ClientImpl::handleCreateProducer, self,
std::placeholders::_1,
+ std::placeholders::_2, topicName, *confPtr,
callback));
+ });
+ } else {
+ lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+ std::bind(&ClientImpl::handleCreateProducer, shared_from_this(),
std::placeholders::_1,
+ std::placeholders::_2, topicName, conf, callback));
+ }
}
void ClientImpl::handleCreateProducer(const Result result, const
LookupDataResultPtr partitionMetadata,
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 8a39396..af2559d 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -68,8 +68,12 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
bool poolConnections);
~ClientImpl();
+ /**
+ * @param autoDownloadSchema When it is true, Before creating a producer,
it will try to get the schema
+ * that exists for the topic.
+ */
void createProducerAsync(const std::string& topic, ProducerConfiguration
conf,
- CreateProducerCallback callback);
+ CreateProducerCallback callback, bool
autoDownloadSchema = false);
void subscribeAsync(const std::string& topic, const std::string&
subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback
callback);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index db993bd..bfd9a46 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -161,6 +161,21 @@ SharedBuffer Commands::newLookup(const std::string& topic,
const bool authoritat
return buffer;
}
+SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t
requestId) {
+ static BaseCommand cmd;
+ static std::mutex mutex;
+ std::lock_guard<std::mutex> lock(mutex);
+ cmd.set_type(BaseCommand::GET_SCHEMA);
+
+ auto getSchema = cmd.mutable_getschema();
+ getSchema->set_topic(topic);
+ getSchema->set_request_id(requestId);
+
+ const SharedBuffer buffer = writeMessageWithSize(cmd);
+ cmd.clear_getschema();
+ return buffer;
+}
+
SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t
requestId) {
static BaseCommand cmd;
static std::mutex mutex;
@@ -872,5 +887,6 @@ bool
Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) {
bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return
peerVersion >= proto::v13; }
bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return
peerVersion >= proto::v15; }
+
} // namespace pulsar
/* namespace pulsar */
diff --git a/lib/Commands.h b/lib/Commands.h
index e7746d2..7d13e2a 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -91,6 +91,8 @@ class Commands {
static SharedBuffer newLookup(const std::string& topic, const bool
authoritative, uint64_t requestId,
const std::string& listenerName);
+ static SharedBuffer newGetSchema(const std::string& topic, uint64_t
requestId);
+
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand&
cmd, uint64_t producerId,
uint64_t sequenceId, ChecksumType
checksumType,
const proto::MessageMetadata& metadata,
const SharedBuffer& payload);
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 8167b64..ae719a5 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -27,6 +27,7 @@
#include "ExecutorService.h"
#include "LogUtils.h"
#include "NamespaceName.h"
+#include "SchemaUtils.h"
#include "ServiceNameResolver.h"
#include "TopicName.h"
namespace ptree = boost::property_tree;
@@ -143,6 +144,25 @@ Future<Result, NamespaceTopicsPtr>
HTTPLookupService::getTopicsOfNamespaceAsync(
return promise.getFuture();
}
+Future<Result, boost::optional<SchemaInfo>> HTTPLookupService::getSchema(const
TopicNamePtr &topicName) {
+ Promise<Result, boost::optional<SchemaInfo>> promise;
+ std::stringstream completeUrlStream;
+
+ const auto &url = serviceNameResolver_.resolveHost();
+ if (topicName->isV2Topic()) {
+ completeUrlStream << url << ADMIN_PATH_V2 << "schemas/" <<
topicName->getProperty() << '/'
+ << topicName->getNamespacePortion() << '/' <<
topicName->getEncodedLocalName()
+ << "/schema";
+ } else {
+ completeUrlStream << url << ADMIN_PATH_V1 << "schemas/" <<
topicName->getProperty() << '/'
+ << topicName->getCluster() << '/' <<
topicName->getNamespacePortion() << '/'
+ << topicName->getEncodedLocalName() << "/schema";
+ }
+
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest,
+ shared_from_this(), promise,
completeUrlStream.str()));
+ return promise.getFuture();
+}
+
static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb,
void *responseDataPtr) {
((std::string *)responseDataPtr)->append((char *)contents, size * nmemb);
return size * nmemb;
@@ -161,6 +181,12 @@ void
HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise
}
Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string
&responseData) {
+ long responseCode = -1;
+ return sendHTTPRequest(completeUrl, responseData, responseCode);
+}
+
+Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string
&responseData,
+ long &responseCode) {
uint16_t reqCount = 0;
Result retResult = ResultOk;
while (++reqCount <= MAX_HTTP_REDIRECTS) {
@@ -253,9 +279,8 @@ Result HTTPLookupService::sendHTTPRequest(std::string
completeUrl, std::string &
// Make get call to server
res = curl_easy_perform(handle);
- long response_code = -1;
- curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
- LOG_INFO("Response received for url " << completeUrl << "
response_code " << response_code
+ curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &responseCode);
+ LOG_INFO("Response received for url " << completeUrl << " responseCode
" << responseCode
<< " curl res " << res);
// Free header list
@@ -263,12 +288,9 @@ Result HTTPLookupService::sendHTTPRequest(std::string
completeUrl, std::string &
switch (res) {
case CURLE_OK:
- long response_code;
- curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE,
&response_code);
- LOG_INFO("Response received for url " << completeUrl << " code
" << response_code);
- if (response_code == 200) {
+ if (responseCode == 200) {
retResult = ResultOk;
- } else if (needRedirection(response_code)) {
+ } else if (needRedirection(responseCode)) {
char *url = NULL;
curl_easy_getinfo(handle, CURLINFO_REDIRECT_URL, &url);
LOG_INFO("Response from url " << completeUrl << " to new
url " << url);
@@ -302,7 +324,7 @@ Result HTTPLookupService::sendHTTPRequest(std::string
completeUrl, std::string &
break;
}
curl_easy_cleanup(handle);
- if (!needRedirection(response_code)) {
+ if (!needRedirection(responseCode)) {
break;
}
}
@@ -409,4 +431,73 @@ void
HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std
}
}
+void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise,
const std::string completeUrl) {
+ std::string responseData;
+ long responseCode = -1;
+ Result result = sendHTTPRequest(completeUrl, responseData, responseCode);
+
+ if (responseCode == 404) {
+ promise.setValue(boost::none);
+ } else if (result != ResultOk) {
+ promise.setFailed(result);
+ } else {
+ ptree::ptree root;
+ std::stringstream stream(responseData);
+ try {
+ ptree::read_json(stream, root);
+ } catch (ptree::json_parser_error &e) {
+ LOG_ERROR("Failed to parse json of Partition Metadata: " <<
e.what()
+ <<
"\nInput Json = " << responseData);
+ promise.setFailed(ResultInvalidMessage);
+ return;
+ }
+ const std::string defaultNotFoundString = "Not found";
+ auto schemaTypeStr = root.get<std::string>("type",
defaultNotFoundString);
+ if (schemaTypeStr == defaultNotFoundString) {
+ LOG_ERROR("malformed json! - type not present" << responseData);
+ promise.setFailed(ResultInvalidMessage);
+ return;
+ }
+ auto schemaData = root.get<std::string>("data", defaultNotFoundString);
+ if (schemaData == defaultNotFoundString) {
+ LOG_ERROR("malformed json! - data not present" << responseData);
+ promise.setFailed(ResultInvalidMessage);
+ return;
+ }
+
+ auto schemaType = enumSchemaType(schemaTypeStr);
+ if (schemaType == KEY_VALUE) {
+ ptree::ptree kvRoot;
+ std::stringstream kvStream(schemaData);
+ try {
+ ptree::read_json(kvStream, kvRoot);
+ } catch (ptree::json_parser_error &e) {
+ LOG_ERROR("Failed to parse json of Partition Metadata: " <<
e.what()
+ <<
"\nInput Json = " << schemaData);
+ promise.setFailed(ResultInvalidMessage);
+ return;
+ }
+ std::stringstream keyStream;
+ ptree::write_json(keyStream, kvRoot.get_child("key"), false);
+ std::stringstream valueStream;
+ ptree::write_json(valueStream, kvRoot.get_child("value"), false);
+ auto keyData = keyStream.str();
+ auto valueData = valueStream.str();
+ // Remove the last line break.
+ keyData.pop_back();
+ valueData.pop_back();
+ schemaData = mergeKeyValueSchema(keyData, valueData);
+ }
+
+ StringMap properties;
+ auto propertiesTree = root.get_child("properties");
+ for (const auto &item : propertiesTree) {
+ properties[item.first] = item.second.get_value<std::string>();
+ }
+
+ SchemaInfo schemaInfo = SchemaInfo(schemaType, "", schemaData,
properties);
+ promise.setValue(schemaInfo);
+ }
+}
+
} // namespace pulsar
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index 929d7ab..f76d3e6 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -28,6 +28,7 @@ namespace pulsar {
class ServiceNameResolver;
using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;
+using GetSchemaPromise = Promise<Result, boost::optional<SchemaInfo>>;
class HTTPLookupService : public LookupService, public
std::enable_shared_from_this<HTTPLookupService> {
class CurlInitializer {
@@ -62,9 +63,12 @@ class HTTPLookupService : public LookupService, public
std::enable_shared_from_t
void handleLookupHTTPRequest(LookupPromise, const std::string,
RequestType);
void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise,
const std::string completeUrl);
+ void handleGetSchemaHTTPRequest(GetSchemaPromise promise, const
std::string completeUrl);
Result sendHTTPRequest(std::string completeUrl, std::string& responseData);
+ Result sendHTTPRequest(std::string completeUrl, std::string& responseData,
long& responseCode);
+
public:
HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const
AuthenticationPtr&);
@@ -72,6 +76,8 @@ class HTTPLookupService : public LookupService, public
std::enable_shared_from_t
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr&) override;
+ Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr&
topicName) override;
+
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) override;
};
} // namespace pulsar
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 6af290c..f1c5de8 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -20,7 +20,9 @@
#define PULSAR_CPP_LOOKUPSERVICE_H
#include <pulsar/Result.h>
+#include <pulsar/Schema.h>
+#include <boost/optional.hpp>
#include <memory>
#include <ostream>
#include <vector>
@@ -72,6 +74,14 @@ class LookupService {
*/
virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const
NamespaceNamePtr& nsName) = 0;
+ /**
+ * returns current SchemaInfo {@link SchemaInfo} for a given topic.
+ *
+ * @param topicName topic-name
+ * @return SchemaInfo
+ */
+ virtual Future<Result, boost::optional<SchemaInfo>> getSchema(const
TopicNamePtr& topicName) = 0;
+
virtual ~LookupService() {}
};
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 7d704ec..f16ff75 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -66,6 +66,12 @@ class RetryableLookupService : public LookupService,
[this, nsName] { return
lookupService_->getTopicsOfNamespaceAsync(nsName); });
}
+ Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr&
topicName) override {
+ return executeAsync<boost::optional<SchemaInfo>>(
+ "get-schema" + topicName->toString(),
+ [this, topicName] { return lookupService_->getSchema(topicName);
});
+ }
+
template <typename T>
Future<Result, T> executeAsync(const std::string& key,
std::function<Future<Result, T>()> f) {
Promise<Result, T> promise;
diff --git a/lib/Schema.cc b/lib/Schema.cc
index 300c91d..d77822a 100644
--- a/lib/Schema.cc
+++ b/lib/Schema.cc
@@ -25,7 +25,8 @@
#include <map>
#include <memory>
-#include "SharedBuffer.h"
+#include "SchemaUtils.h"
+
using boost::property_tree::ptree;
using boost::property_tree::read_json;
using boost::property_tree::write_json;
@@ -40,14 +41,6 @@ PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s,
pulsar::KeyValueEncoding
namespace pulsar {
-static const std::string KEY_SCHEMA_NAME = "key.schema.name";
-static const std::string KEY_SCHEMA_TYPE = "key.schema.type";
-static const std::string KEY_SCHEMA_PROPS = "key.schema.properties";
-static const std::string VALUE_SCHEMA_NAME = "value.schema.name";
-static const std::string VALUE_SCHEMA_TYPE = "value.schema.type";
-static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties";
-static const std::string KV_ENCODING_TYPE = "kv.encoding.type";
-
PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) {
switch (encodingType) {
case KeyValueEncodingType::INLINE:
@@ -112,6 +105,44 @@ PULSAR_PUBLIC const char *strSchemaType(SchemaType
schemaType) {
return "UnknownSchemaType";
}
+PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr) {
+ if (schemaTypeStr == "NONE") {
+ return NONE;
+ } else if (schemaTypeStr == "STRING") {
+ return STRING;
+ } else if (schemaTypeStr == "INT8") {
+ return INT8;
+ } else if (schemaTypeStr == "INT16") {
+ return INT16;
+ } else if (schemaTypeStr == "INT32") {
+ return INT32;
+ } else if (schemaTypeStr == "INT64") {
+ return INT64;
+ } else if (schemaTypeStr == "FLOAT") {
+ return FLOAT;
+ } else if (schemaTypeStr == "DOUBLE") {
+ return DOUBLE;
+ } else if (schemaTypeStr == "BYTES") {
+ return BYTES;
+ } else if (schemaTypeStr == "JSON") {
+ return JSON;
+ } else if (schemaTypeStr == "PROTOBUF") {
+ return PROTOBUF;
+ } else if (schemaTypeStr == "AVRO") {
+ return AVRO;
+ } else if (schemaTypeStr == "AUTO_CONSUME") {
+ return AUTO_CONSUME;
+ } else if (schemaTypeStr == "AUTO_PUBLISH") {
+ return AUTO_PUBLISH;
+ } else if (schemaTypeStr == "KEY_VALUE") {
+ return KEY_VALUE;
+ } else if (schemaTypeStr == "PROTOBUF_NATIVE") {
+ return PROTOBUF_NATIVE;
+ } else {
+ throw std::invalid_argument("No match schema type: " + schemaTypeStr);
+ }
+}
+
class PULSAR_PUBLIC SchemaInfoImpl {
public:
const std::string name_;
@@ -134,18 +165,6 @@ SchemaInfo::SchemaInfo(SchemaType schemaType, const
std::string &name, const std
SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo
&valueSchema,
const KeyValueEncodingType &keyValueEncodingType) {
- std::string keySchemaStr = keySchema.getSchema();
- std::string valueSchemaStr = valueSchema.getSchema();
- uint32_t keySize = keySchemaStr.size();
- uint32_t valueSize = valueSchemaStr.size();
-
- auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize;
- SharedBuffer buffer = SharedBuffer::allocate(buffSize);
- buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE :
static_cast<uint32_t>(keySize));
- buffer.write(keySchemaStr.c_str(), static_cast<uint32_t>(keySize));
- buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE :
static_cast<uint32_t>(valueSize));
- buffer.write(valueSchemaStr.c_str(), static_cast<uint32_t>(valueSize));
-
auto writeJson = [](const StringMap &properties) {
ptree pt;
for (auto &entry : properties) {
@@ -167,8 +186,10 @@ SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const
SchemaInfo &valueSchem
properties.emplace(VALUE_SCHEMA_PROPS,
writeJson(valueSchema.getProperties()));
properties.emplace(KV_ENCODING_TYPE,
strEncodingType(keyValueEncodingType));
- impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue",
std::string(buffer.data(), buffSize),
- properties);
+ std::string keySchemaStr = keySchema.getSchema();
+ std::string valueSchemaStr = valueSchema.getSchema();
+ impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue",
+ mergeKeyValueSchema(keySchemaStr,
valueSchemaStr), properties);
}
SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
diff --git a/lib/SchemaUtils.h b/lib/SchemaUtils.h
new file mode 100644
index 0000000..a298a03
--- /dev/null
+++ b/lib/SchemaUtils.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef SCHEMA_UTILS_HPP_
+#define SCHEMA_UTILS_HPP_
+
+#include "SharedBuffer.h"
+
+namespace pulsar {
+
+static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
+static const std::string KEY_SCHEMA_NAME = "key.schema.name";
+static const std::string KEY_SCHEMA_TYPE = "key.schema.type";
+static const std::string KEY_SCHEMA_PROPS = "key.schema.properties";
+static const std::string VALUE_SCHEMA_NAME = "value.schema.name";
+static const std::string VALUE_SCHEMA_TYPE = "value.schema.type";
+static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties";
+static const std::string KV_ENCODING_TYPE = "kv.encoding.type";
+
+/**
+ * Merge keySchemaData and valueSchemaData.
+ * @return
+ */
+static std::string mergeKeyValueSchema(const std::string& keySchemaData, const
std::string& valueSchemaData) {
+ uint32_t keySize = keySchemaData.size();
+ uint32_t valueSize = valueSchemaData.size();
+
+ auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize;
+ SharedBuffer buffer = SharedBuffer::allocate(buffSize);
+ buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE :
static_cast<uint32_t>(keySize));
+ buffer.write(keySchemaData.c_str(), static_cast<uint32_t>(keySize));
+ buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE :
static_cast<uint32_t>(valueSize));
+ buffer.write(valueSchemaData.c_str(), static_cast<uint32_t>(valueSize));
+
+ return std::string(buffer.data(), buffSize);
+}
+
+} // namespace pulsar
+
+#endif /* SCHEMA_UTILS_HPP_ */
diff --git a/test-conf/standalone-ssl.conf b/test-conf/standalone-ssl.conf
index ee7c16c..1e54360 100644
--- a/test-conf/standalone-ssl.conf
+++ b/test-conf/standalone-ssl.conf
@@ -50,6 +50,9 @@ brokerShutdownTimeoutMs=3000
# Enable backlog quota check. Enforces action on topic when the quota is
reached
backlogQuotaCheckEnabled=true
+# Disable schema validation: If a producer doesn’t carry a schema, the
producer is allowed to connect to the topic and produce data.
+isSchemaValidationEnforced=true
+
# How often to check for topics that have reached the quota
backlogQuotaCheckIntervalInSeconds=60
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 5ac4122..e31246f 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -39,6 +39,9 @@ using namespace pulsar;
DECLARE_LOG_OBJECT()
+static std::string binaryLookupUrl = "pulsar://localhost:6650";
+static std::string httpLookupUrl = "http://localhost:8080";
+
TEST(LookupServiceTest, basicLookup) {
ExecutorServiceProviderPtr service =
std::make_shared<ExecutorServiceProvider>(1);
AuthenticationPtr authData = AuthFactory::Disabled();
@@ -274,3 +277,82 @@ TEST(LookupServiceTest, testTimeout) {
ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
}
+
+class LookupServiceTest : public ::testing::TestWithParam<std::string> {
+ public:
+ void TearDown() override { client_.close(); }
+
+ protected:
+ Client client_{GetParam()};
+};
+
+TEST_P(LookupServiceTest, testGetSchema) {
+ const std::string topic = "testGetSchema" + std::to_string(time(nullptr))
+ GetParam().substr(0, 4);
+ std::string jsonSchema =
+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+
+ StringMap properties;
+ properties.emplace("key1", "value1");
+ properties.emplace("key2", "value2");
+
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setSchema(SchemaInfo(SchemaType::JSON, "json",
jsonSchema, properties));
+ Producer producer;
+ ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration,
producer));
+
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+ auto lookup = clientImplPtr->getLookup();
+
+ boost::optional<SchemaInfo> schemaInfo;
+ auto future = lookup->getSchema(TopicName::get(topic));
+ ASSERT_EQ(ResultOk, future.get(schemaInfo));
+ ASSERT_EQ(jsonSchema, schemaInfo->getSchema());
+ ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType());
+ ASSERT_EQ(properties, schemaInfo->getProperties());
+}
+
+TEST_P(LookupServiceTest, testGetSchemaNotFund) {
+ const std::string topic =
+ "testGetSchemaNotFund" + std::to_string(time(nullptr)) +
GetParam().substr(0, 4);
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));
+
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+ auto lookup = clientImplPtr->getLookup();
+
+ boost::optional<SchemaInfo> schemaInfo;
+ auto future = lookup->getSchema(TopicName::get(topic));
+ ASSERT_EQ(ResultOk, future.get(schemaInfo));
+ ASSERT_FALSE(schemaInfo);
+}
+
+TEST_P(LookupServiceTest, testGetKeyValueSchema) {
+ const std::string topic =
+ "testGetKeyValueSchema" + std::to_string(time(nullptr)) +
GetParam().substr(0, 4);
+ StringMap properties;
+ properties.emplace("key1", "value1");
+ properties.emplace("key2", "value2");
+ std::string jsonSchema =
+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+ SchemaInfo keySchema(JSON, "key-json", jsonSchema, properties);
+ SchemaInfo valueSchema(JSON, "value-json", jsonSchema, properties);
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::INLINE);
+
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setSchema(keyValueSchema);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration,
producer));
+
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+ auto lookup = clientImplPtr->getLookup();
+
+ boost::optional<SchemaInfo> schemaInfo;
+ auto future = lookup->getSchema(TopicName::get(topic));
+ ASSERT_EQ(ResultOk, future.get(schemaInfo));
+ ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema());
+ ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType());
+ ASSERT_FALSE(schemaInfo->getProperties().empty());
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, LookupServiceTest,
::testing::Values(binaryLookupUrl, httpLookupUrl));
\ No newline at end of file
diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc
index cf0f8c7..34e224e 100644
--- a/tests/SchemaTest.cc
+++ b/tests/SchemaTest.cc
@@ -19,12 +19,12 @@
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include "PulsarFriend.h"
#include "SharedBuffer.h"
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
-
static const std::string exampleSchema =
R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})";
@@ -50,11 +50,10 @@ TEST(SchemaTest, testSchema) {
res = client.createProducer("topic-avro", producerConf, producer);
ASSERT_EQ(ResultIncompatibleSchema, res);
- // Creating producer with no schema on same topic should succeed
- // because standalone broker is configured by default to not
- // require the schema to be set
+ // Creating producer with no schema on same topic should failed.
+ // Because we set broker config isSchemaValidationEnforced=true
res = client.createProducer("topic-avro", producer);
- ASSERT_EQ(ResultOk, res);
+ ASSERT_EQ(ResultIncompatibleSchema, res);
ConsumerConfiguration consumerConf;
Consumer consumer;
@@ -154,3 +153,33 @@ TEST(SchemaTest, testValueSchemaIsEmpty) {
int valueSchemaSize = buffer.readUnsignedInt();
ASSERT_EQ(valueSchemaSize, -1);
}
+
+TEST(SchemaTest, testAutoDownloadSchema) {
+ const std::string topic = "testAutoPublicSchema" +
std::to_string(time(nullptr));
+ std::string jsonSchema =
+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+ SchemaInfo schema(JSON, "test-schema", jsonSchema);
+
+ Client client(lookupUrl);
+
+ ConsumerConfiguration consumerConfiguration;
+ consumerConfiguration.setSchema(schema);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub",
consumerConfiguration, consumer));
+
+ ProducerConfiguration producerConfiguration;
+ Producer producer;
+
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+ Promise<Result, Producer> promise;
+ clientImplPtr->createProducerAsync(topic, producerConfiguration,
WaitForCallbackValue<Producer>(promise),
+ true);
+ ASSERT_EQ(ResultOk, promise.getFuture().get(producer));
+
+ Message msg = MessageBuilder().setContent("content").build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
+ ASSERT_EQ("content", msg.getDataAsString());
+}