This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a46da16 [feat] Support messages with generic types (#149)
a46da16 is described below
commit a46da16421323d146c33d1e9d00a0352b50ace89
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Jan 5 17:12:27 2023 +0800
[feat] Support messages with generic types (#149)
### Motivation
Pulsar C++ client doesn't support schema yet. It only supports
configuring `SchemaInfo` when creating producers or consumers. The main
reason is that C++'s templates are processed at the time of compilation.
Templatizing `Producer`, `Consumer` or `Message` could expose all
internal code.
Currently, users might write the following code for serialization and
deserialization:
```c++
producer.send(MessageBuilder().setContent(encode(value)).build());
```
```c++
Message msg;
consumer.receive(msg);
auto value = decode(msg.getData(), msg.getSize());
```
However, the `encode` and `decode` functions are just possible
solutions from users, they might use some other interfaces like a class
with two virtual methods. There is no way to provide a common interface
for serialization and deserialization.
### Modifications
Add a `TypedMessageBuilder<T>` class template that accepts an encoder
function and a validation function. The validation function is used when
users want to simulate the Java client's `AUTO_PRODUCE` schema. Define a
full specialization for `std::string` template argument to avoid
encoding. Since it inherits the `MessageBuilder`, it's compatible with
the current code style:
```c++
// It should be noted you have to call `setValue` before methods in base
class
auto msg =
TypedMessageBuilder<T>(encoder).setValue(value).setPartitionKey(key).build();
```
Add a `TypedMessage<T>` class that only adds a decoder to the `Message`
instance and can be converted from the `Message` directly:
```c++
auto typedMsg = TypedMessage<T>(msg, decoder);
std::cout << typedMsg.getValue() << std::endl; // decode the bytes
std::cout << typedMsg.getMessageId() << std::endl; // call methods from
Message
```
For convenience, the following APIs are added to `Consumer`:
```c++
template <typename T>
Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder
decoder);
template <typename T>
Result receive(TypedMessage<T>& msg, int timeoutMs, typename
TypedMessage<T>::Decoder decoder);
template <typename T>
void receiveAsync(std::function<void(Result result, const
TypedMessage<T>&)> callback,
typename TypedMessage<T>::Decoder decoder)
```
The `ConsumerConfiguration` can configure a listener that accepts a
`TypedMessage<T>` now:
```c++
template <typename T>
ConsumerConfiguration& setTypedMessageListener(
std::function<void(Consumer&, const TypedMessage<T>&)> listener,
typename TypedMessage<T>::Decoder decoder);
```
Since it calls the original listener and `Consumer` is only forward
declared in `ConsumerConfiguration.h`, the 1st argument is changed from
`Consumer` to `Consumer&`. It's an API change but it's backward
compatible because `std::function<void(Consumer, ...)>` can be cast to
`std::function<Consumer&, ...)>` implicitly.
Based on these API changes, we can write a separated header-only C++
library as the schema extension.
---
include/pulsar/Consumer.h | 25 ++++++
include/pulsar/ConsumerConfiguration.h | 12 ++-
include/pulsar/Message.h | 2 +-
include/pulsar/MessageBuilder.h | 5 +-
include/pulsar/TypedMessage.h | 49 ++++++++++++
include/pulsar/TypedMessageBuilder.h | 79 +++++++++++++++++++
lib/ConsumerImpl.cc | 3 +-
lib/MessageBuilder.cc | 12 +++
lib/MultiTopicsConsumerImpl.cc | 3 +-
tests/TypedMessageTest.cc | 136 +++++++++++++++++++++++++++++++++
10 files changed, 321 insertions(+), 5 deletions(-)
diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h
index d35defd..6596894 100644
--- a/include/pulsar/Consumer.h
+++ b/include/pulsar/Consumer.h
@@ -21,6 +21,7 @@
#include <pulsar/BrokerConsumerStats.h>
#include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/TypedMessage.h>
#include <pulsar/defines.h>
#include <iostream>
@@ -91,6 +92,14 @@ class PULSAR_PUBLIC Consumer {
*/
Result receive(Message& msg);
+ template <typename T>
+ Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder
decoder) {
+ Message rawMsg;
+ auto result = receive(rawMsg);
+ msg = TypedMessage<T>{rawMsg, decoder};
+ return result;
+ }
+
/**
*
* @param msg a non-const reference where the received message will be
copied
@@ -101,6 +110,14 @@ class PULSAR_PUBLIC Consumer {
*/
Result receive(Message& msg, int timeoutMs);
+ template <typename T>
+ Result receive(TypedMessage<T>& msg, int timeoutMs, typename
TypedMessage<T>::Decoder decoder) {
+ Message rawMsg;
+ auto result = receive(rawMsg, timeoutMs);
+ msg = TypedMessage<T>{rawMsg, decoder};
+ return result;
+ }
+
/**
* Receive a single message
* <p>
@@ -114,6 +131,14 @@ class PULSAR_PUBLIC Consumer {
*/
void receiveAsync(ReceiveCallback callback);
+ template <typename T>
+ void receiveAsync(std::function<void(Result result, const
TypedMessage<T>&)> callback,
+ typename TypedMessage<T>::Decoder decoder) {
+ receiveAsync([callback, decoder](Result result, const Message& msg) {
+ callback(result, TypedMessage<T>{msg, decoder});
+ });
+ }
+
/**
* Batch receiving messages.
*
diff --git a/include/pulsar/ConsumerConfiguration.h
b/include/pulsar/ConsumerConfiguration.h
index 520901c..70a52aa 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -28,6 +28,7 @@
#include <pulsar/Message.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
+#include <pulsar/TypedMessage.h>
#include <pulsar/defines.h>
#include <functional>
@@ -48,7 +49,7 @@ typedef std::function<void(Result, const Messages& msgs)>
BatchReceiveCallback;
typedef std::function<void(Result result, MessageId messageId)>
GetLastMessageIdCallback;
/// Callback definition for MessageListener
-typedef std::function<void(Consumer consumer, const Message& msg)>
MessageListener;
+typedef std::function<void(Consumer& consumer, const Message& msg)>
MessageListener;
typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
@@ -126,6 +127,15 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
ConsumerConfiguration& setMessageListener(MessageListener messageListener);
+ template <typename T>
+ ConsumerConfiguration& setTypedMessageListener(
+ std::function<void(Consumer&, const TypedMessage<T>&)> listener,
+ typename TypedMessage<T>::Decoder decoder) {
+ return setMessageListener([listener, decoder](Consumer& consumer,
const Message& msg) {
+ listener(consumer, TypedMessage<T>{msg, decoder});
+ });
+ }
+
/**
* @return the message listener
*/
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 77f30d4..f9e037e 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -183,7 +183,7 @@ class PULSAR_PUBLIC Message {
bool operator==(const Message& msg) const;
- private:
+ protected:
typedef std::shared_ptr<MessageImpl> MessageImplPtr;
MessageImplPtr impl_;
diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h
index a668dd4..c2f089f 100644
--- a/include/pulsar/MessageBuilder.h
+++ b/include/pulsar/MessageBuilder.h
@@ -162,8 +162,11 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& create();
+ protected:
+ const char* data() const;
+ std::size_t size() const;
+
private:
- MessageBuilder(const MessageBuilder&);
void checkMetadata();
static std::shared_ptr<MessageImpl> createMessageImpl();
Message::MessageImplPtr impl_;
diff --git a/include/pulsar/TypedMessage.h b/include/pulsar/TypedMessage.h
new file mode 100644
index 0000000..63cdf66
--- /dev/null
+++ b/include/pulsar/TypedMessage.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Message.h>
+
+#include <functional>
+
+namespace pulsar {
+
+template <typename T>
+class TypedMessage : public Message {
+ public:
+ using Decoder = std::function<T(const char*, std::size_t)>;
+
+ TypedMessage() = default;
+
+ TypedMessage(
+ const Message& message, Decoder decoder = [](const char*, std::size_t)
{ return T{}; })
+ : Message(message), decoder_(decoder) {}
+
+ T getValue() const { return decoder_(static_cast<const char*>(getData()),
getLength()); }
+
+ TypedMessage& setDecoder(Decoder decoder) {
+ decoder_ = decoder;
+ return *this;
+ }
+
+ private:
+ Decoder decoder_;
+};
+
+} // namespace pulsar
diff --git a/include/pulsar/TypedMessageBuilder.h
b/include/pulsar/TypedMessageBuilder.h
new file mode 100644
index 0000000..74d1064
--- /dev/null
+++ b/include/pulsar/TypedMessageBuilder.h
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/MessageBuilder.h>
+
+#include <functional>
+
+namespace pulsar {
+
+template <typename T>
+class TypedMessageBuilder : public MessageBuilder {
+ public:
+ using Encoder = std::function<std::string(const T&)>;
+ using Validator = std::function<void(const char* data, size_t)>;
+
+ TypedMessageBuilder(
+ Encoder encoder, Validator validator = [](const char*, std::size_t) {})
+ : encoder_(encoder), validator_(validator) {}
+
+ TypedMessageBuilder& setValue(const T& value) {
+ setContent(encoder_(value));
+ if (validator_) {
+ validator_(data(), size());
+ }
+ return *this;
+ }
+
+ private:
+ const Encoder encoder_;
+ const Validator validator_;
+};
+
+template <>
+class TypedMessageBuilder<std::string> : public MessageBuilder {
+ public:
+ // The validator should throw an exception to indicate the message is
corrupted.
+ using Validator = std::function<void(const char* data, size_t)>;
+
+ TypedMessageBuilder(Validator validator = nullptr) : validator_(validator)
{}
+
+ TypedMessageBuilder& setValue(const std::string& value) {
+ if (validator_) {
+ validator_(value.data(), value.size());
+ }
+ setContent(value);
+ return *this;
+ }
+
+ TypedMessageBuilder& setValue(std::string&& value) {
+ if (validator_) {
+ validator_(value.data(), value.size());
+ }
+ setContent(std::move(value));
+ return *this;
+ }
+
+ private:
+ Validator validator_;
+};
+using BytesMessageBuilder = TypedMessageBuilder<std::string>;
+
+} // namespace pulsar
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 6f2c211..b1a620a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -773,7 +773,8 @@ void ConsumerImpl::internalListener() {
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessageId_ = msg.getMessageId();
- messageListener_(Consumer(get_shared_this_ptr()), msg);
+ Consumer consumer{get_shared_this_ptr()};
+ messageListener_(consumer, msg);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
}
diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc
index b33394e..ae56d10 100644
--- a/lib/MessageBuilder.cc
+++ b/lib/MessageBuilder.cc
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include <assert.h>
#include <pulsar/MessageBuilder.h>
#include <memory>
@@ -155,4 +156,15 @@ MessageBuilder& MessageBuilder::disableReplication(bool
flag) {
r.Swap(impl_->metadata.mutable_replicate_to());
return *this;
}
+
+const char* MessageBuilder::data() const {
+ assert(impl_->payload.data());
+ return impl_->payload.data();
+}
+
+size_t MessageBuilder::size() const {
+ assert(impl_->payload.data());
+ return impl_->payload.readableBytes();
+}
+
} // namespace pulsar
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index d9d3873..a013566 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -539,7 +539,8 @@ void MultiTopicsConsumerImpl::internalListener(Consumer
consumer) {
Message m;
incomingMessages_.pop(m);
try {
- messageListener_(Consumer(get_shared_this_ptr()), m);
+ Consumer self{get_shared_this_ptr()};
+ messageListener_(self, m);
messageProcessed(m);
} catch (const std::exception& e) {
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" <<
e.what());
diff --git a/tests/TypedMessageTest.cc b/tests/TypedMessageTest.cc
new file mode 100644
index 0000000..9a61bfe
--- /dev/null
+++ b/tests/TypedMessageTest.cc
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/MessageBuilder.h>
+#include <pulsar/TypedMessage.h>
+#include <pulsar/TypedMessageBuilder.h>
+
+#include <mutex>
+#include <vector>
+
+#include "WaitUtils.h"
+#include "lib/Latch.h"
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+
+extern std::string unique_str();
+
+using IntMessageBuilder = TypedMessageBuilder<int>;
+static auto intEncoder = [](int x) { return std::to_string(x); };
+static auto intDecoder = [](const char* data, size_t size) { return
std::stoi(std::string(data, size)); };
+
+TEST(TypedMessageTest, testReceive) {
+ const auto topic = "typed-message-test-receive-" + unique_str();
+ Client client(lookupUrl);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+ std::vector<MessageId> messageIds;
+ constexpr int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++) {
+ MessageId messageId;
+ auto msg = TypedMessageBuilder<int>{intEncoder}.setValue(i).build();
+ ASSERT_EQ(ResultOk, producer.send(msg, messageId));
+ messageIds.emplace_back(messageId);
+ }
+ for (int i = 0; i < numMessages; i++) {
+ TypedMessage<int> msg;
+ // ensure the thread safety for `msg`, which could be modified in the
callback of `receiveAsync`
+ std::mutex msgMutex;
+ if (i % 3 == 0) {
+ ASSERT_EQ(ResultOk, consumer.receive(msg, intDecoder));
+ } else if (i % 3 == 1) {
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000, intDecoder));
+ } else {
+ Latch latch{1};
+ consumer.receiveAsync(
+ [&latch, &msg, &msgMutex](Result result, const
TypedMessage<int>& receivedMsg) {
+ ASSERT_EQ(ResultOk, result);
+ {
+ std::lock_guard<std::mutex> lock{msgMutex};
+ msg = TypedMessage<int>{receivedMsg, intDecoder};
+ }
+ latch.countdown();
+ });
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(1)));
+ }
+
+ std::lock_guard<std::mutex> lock{msgMutex};
+ ASSERT_EQ(msg.getValue(), i);
+ ASSERT_EQ(msg.getMessageId(), messageIds[i]);
+ }
+
+ client.close();
+}
+
+TEST(TypedMessageTest, testListener) {
+ const auto topic = "typed-message-test-listener-" + unique_str();
+ Client client(lookupUrl);
+
+ ConsumerConfiguration conf;
+ std::vector<int> values;
+ std::mutex valuesMutex;
+ conf.setTypedMessageListener<int>(
+ [&values, &valuesMutex](Consumer& consumer, const TypedMessage<int>&
msg) {
+ std::lock_guard<std::mutex> lock{valuesMutex};
+ values.emplace_back(msg.getValue());
+ },
+ intDecoder);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ constexpr int numMessages = 100;
+ for (int i = 0; i < numMessages; i++) {
+ auto msg = TypedMessageBuilder<int>{intEncoder}.setValue(i).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ waitUntil(std::chrono::seconds(3), [&values, &valuesMutex] {
+ std::lock_guard<std::mutex> lock{valuesMutex};
+ return values.size() == numMessages;
+ });
+ std::lock_guard<std::mutex> lock{valuesMutex};
+ ASSERT_EQ(values.size(), numMessages);
+ for (int i = 0; i < numMessages; i++) {
+ ASSERT_EQ(values[i], i);
+ }
+}
+
+TEST(TypedMessageTest, testValidate) {
+ auto encoder = [](int x) { return std::to_string(x); };
+ auto validator = [](const char* data, size_t size) {
+ if (size > 3) {
+ throw std::runtime_error(std::to_string(size));
+ }
+ };
+ IntMessageBuilder intMessageBuilder{encoder, validator};
+ intMessageBuilder.setValue(1);
+ ASSERT_THROW(intMessageBuilder.setValue(1234), std::runtime_error);
+ intMessageBuilder.setValue(123);
+
+ BytesMessageBuilder{validator}.setContent("123");
+}