This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a46da16  [feat] Support messages with generic types (#149)
a46da16 is described below

commit a46da16421323d146c33d1e9d00a0352b50ace89
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Jan 5 17:12:27 2023 +0800

    [feat] Support messages with generic types (#149)
    
    ### Motivation
    
    Pulsar C++ client doesn't support schema yet. It only supports
    configuring `SchemaInfo` when creating producers or consumers. The main
    reason is that C++'s templates are processed at the time of compilation.
    Templatizing `Producer`, `Consumer` or `Message` could expose all
    internal code.
    
    Currently, users might write the following code for serialization and
    deserialization:
    
    ```c++
    producer.send(MessageBuilder().setContent(encode(value)).build());
    ```
    
    ```c++
    Message msg;
    consumer.receive(msg);
    auto value = decode(msg.getData(), msg.getSize());
    ```
    
    However, the `encode` and `decode` functions are just possible
    solutions from users, they might use some other interfaces like a class
    with two virtual methods. There is no way to provide a common interface
    for serialization and deserialization.
    
    ### Modifications
    
    Add a `TypedMessageBuilder<T>` class template that accepts an encoder
    function and a validation function. The validation function is used when
    users want to simulate the Java client's `AUTO_PRODUCE` schema. Define a
    full specialization for `std::string` template argument to avoid
    encoding. Since it inherits the `MessageBuilder`, it's compatible with
    the current code style:
    
    ```c++
    // It should be noted you have to call `setValue` before methods in base 
class
    auto msg = 
TypedMessageBuilder<T>(encoder).setValue(value).setPartitionKey(key).build();
    ```
    
    Add a `TypedMessage<T>` class that only adds a decoder to the `Message`
    instance and can be converted from the `Message` directly:
    
    ```c++
    auto typedMsg = TypedMessage<T>(msg, decoder);
    std::cout << typedMsg.getValue() << std::endl;  // decode the bytes
    std::cout << typedMsg.getMessageId() << std::endl;  // call methods from 
Message
    ```
    
    For convenience, the following APIs are added to `Consumer`:
    
    ```c++
    template <typename T>
    Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder 
decoder);
    
    template <typename T>
    Result receive(TypedMessage<T>& msg, int timeoutMs, typename 
TypedMessage<T>::Decoder decoder);
    
    template <typename T>
    void receiveAsync(std::function<void(Result result, const 
TypedMessage<T>&)> callback,
                      typename TypedMessage<T>::Decoder decoder)
    ```
    
    The `ConsumerConfiguration` can configure a listener that accepts a
    `TypedMessage<T>` now:
    
    ```c++
    template <typename T>
    ConsumerConfiguration& setTypedMessageListener(
            std::function<void(Consumer&, const TypedMessage<T>&)> listener,
            typename TypedMessage<T>::Decoder decoder);
    ```
    
    Since it calls the original listener and `Consumer` is only forward
    declared in `ConsumerConfiguration.h`, the 1st argument is changed from
    `Consumer` to `Consumer&`. It's an API change but it's backward
    compatible because `std::function<void(Consumer, ...)>` can be cast to
    `std::function<Consumer&, ...)>` implicitly.
    
    Based on these API changes, we can write a separated header-only C++
    library as the schema extension.
---
 include/pulsar/Consumer.h              |  25 ++++++
 include/pulsar/ConsumerConfiguration.h |  12 ++-
 include/pulsar/Message.h               |   2 +-
 include/pulsar/MessageBuilder.h        |   5 +-
 include/pulsar/TypedMessage.h          |  49 ++++++++++++
 include/pulsar/TypedMessageBuilder.h   |  79 +++++++++++++++++++
 lib/ConsumerImpl.cc                    |   3 +-
 lib/MessageBuilder.cc                  |  12 +++
 lib/MultiTopicsConsumerImpl.cc         |   3 +-
 tests/TypedMessageTest.cc              | 136 +++++++++++++++++++++++++++++++++
 10 files changed, 321 insertions(+), 5 deletions(-)

diff --git a/include/pulsar/Consumer.h b/include/pulsar/Consumer.h
index d35defd..6596894 100644
--- a/include/pulsar/Consumer.h
+++ b/include/pulsar/Consumer.h
@@ -21,6 +21,7 @@
 
 #include <pulsar/BrokerConsumerStats.h>
 #include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/TypedMessage.h>
 #include <pulsar/defines.h>
 
 #include <iostream>
@@ -91,6 +92,14 @@ class PULSAR_PUBLIC Consumer {
      */
     Result receive(Message& msg);
 
+    template <typename T>
+    Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder 
decoder) {
+        Message rawMsg;
+        auto result = receive(rawMsg);
+        msg = TypedMessage<T>{rawMsg, decoder};
+        return result;
+    }
+
     /**
      *
      * @param msg a non-const reference where the received message will be 
copied
@@ -101,6 +110,14 @@ class PULSAR_PUBLIC Consumer {
      */
     Result receive(Message& msg, int timeoutMs);
 
+    template <typename T>
+    Result receive(TypedMessage<T>& msg, int timeoutMs, typename 
TypedMessage<T>::Decoder decoder) {
+        Message rawMsg;
+        auto result = receive(rawMsg, timeoutMs);
+        msg = TypedMessage<T>{rawMsg, decoder};
+        return result;
+    }
+
     /**
      * Receive a single message
      * <p>
@@ -114,6 +131,14 @@ class PULSAR_PUBLIC Consumer {
      */
     void receiveAsync(ReceiveCallback callback);
 
+    template <typename T>
+    void receiveAsync(std::function<void(Result result, const 
TypedMessage<T>&)> callback,
+                      typename TypedMessage<T>::Decoder decoder) {
+        receiveAsync([callback, decoder](Result result, const Message& msg) {
+            callback(result, TypedMessage<T>{msg, decoder});
+        });
+    }
+
     /**
      * Batch receiving messages.
      *
diff --git a/include/pulsar/ConsumerConfiguration.h 
b/include/pulsar/ConsumerConfiguration.h
index 520901c..70a52aa 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -28,6 +28,7 @@
 #include <pulsar/Message.h>
 #include <pulsar/Result.h>
 #include <pulsar/Schema.h>
+#include <pulsar/TypedMessage.h>
 #include <pulsar/defines.h>
 
 #include <functional>
@@ -48,7 +49,7 @@ typedef std::function<void(Result, const Messages& msgs)> 
BatchReceiveCallback;
 typedef std::function<void(Result result, MessageId messageId)> 
GetLastMessageIdCallback;
 
 /// Callback definition for MessageListener
-typedef std::function<void(Consumer consumer, const Message& msg)> 
MessageListener;
+typedef std::function<void(Consumer& consumer, const Message& msg)> 
MessageListener;
 
 typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
 
@@ -126,6 +127,15 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     ConsumerConfiguration& setMessageListener(MessageListener messageListener);
 
+    template <typename T>
+    ConsumerConfiguration& setTypedMessageListener(
+        std::function<void(Consumer&, const TypedMessage<T>&)> listener,
+        typename TypedMessage<T>::Decoder decoder) {
+        return setMessageListener([listener, decoder](Consumer& consumer, 
const Message& msg) {
+            listener(consumer, TypedMessage<T>{msg, decoder});
+        });
+    }
+
     /**
      * @return the message listener
      */
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 77f30d4..f9e037e 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -183,7 +183,7 @@ class PULSAR_PUBLIC Message {
 
     bool operator==(const Message& msg) const;
 
-   private:
+   protected:
     typedef std::shared_ptr<MessageImpl> MessageImplPtr;
     MessageImplPtr impl_;
 
diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h
index a668dd4..c2f089f 100644
--- a/include/pulsar/MessageBuilder.h
+++ b/include/pulsar/MessageBuilder.h
@@ -162,8 +162,11 @@ class PULSAR_PUBLIC MessageBuilder {
      */
     MessageBuilder& create();
 
+   protected:
+    const char* data() const;
+    std::size_t size() const;
+
    private:
-    MessageBuilder(const MessageBuilder&);
     void checkMetadata();
     static std::shared_ptr<MessageImpl> createMessageImpl();
     Message::MessageImplPtr impl_;
diff --git a/include/pulsar/TypedMessage.h b/include/pulsar/TypedMessage.h
new file mode 100644
index 0000000..63cdf66
--- /dev/null
+++ b/include/pulsar/TypedMessage.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Message.h>
+
+#include <functional>
+
+namespace pulsar {
+
+template <typename T>
+class TypedMessage : public Message {
+   public:
+    using Decoder = std::function<T(const char*, std::size_t)>;
+
+    TypedMessage() = default;
+
+    TypedMessage(
+        const Message& message, Decoder decoder = [](const char*, std::size_t) 
{ return T{}; })
+        : Message(message), decoder_(decoder) {}
+
+    T getValue() const { return decoder_(static_cast<const char*>(getData()), 
getLength()); }
+
+    TypedMessage& setDecoder(Decoder decoder) {
+        decoder_ = decoder;
+        return *this;
+    }
+
+   private:
+    Decoder decoder_;
+};
+
+}  // namespace pulsar
diff --git a/include/pulsar/TypedMessageBuilder.h 
b/include/pulsar/TypedMessageBuilder.h
new file mode 100644
index 0000000..74d1064
--- /dev/null
+++ b/include/pulsar/TypedMessageBuilder.h
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/MessageBuilder.h>
+
+#include <functional>
+
+namespace pulsar {
+
+template <typename T>
+class TypedMessageBuilder : public MessageBuilder {
+   public:
+    using Encoder = std::function<std::string(const T&)>;
+    using Validator = std::function<void(const char* data, size_t)>;
+
+    TypedMessageBuilder(
+        Encoder encoder, Validator validator = [](const char*, std::size_t) {})
+        : encoder_(encoder), validator_(validator) {}
+
+    TypedMessageBuilder& setValue(const T& value) {
+        setContent(encoder_(value));
+        if (validator_) {
+            validator_(data(), size());
+        }
+        return *this;
+    }
+
+   private:
+    const Encoder encoder_;
+    const Validator validator_;
+};
+
+template <>
+class TypedMessageBuilder<std::string> : public MessageBuilder {
+   public:
+    // The validator should throw an exception to indicate the message is 
corrupted.
+    using Validator = std::function<void(const char* data, size_t)>;
+
+    TypedMessageBuilder(Validator validator = nullptr) : validator_(validator) 
{}
+
+    TypedMessageBuilder& setValue(const std::string& value) {
+        if (validator_) {
+            validator_(value.data(), value.size());
+        }
+        setContent(value);
+        return *this;
+    }
+
+    TypedMessageBuilder& setValue(std::string&& value) {
+        if (validator_) {
+            validator_(value.data(), value.size());
+        }
+        setContent(std::move(value));
+        return *this;
+    }
+
+   private:
+    Validator validator_;
+};
+using BytesMessageBuilder = TypedMessageBuilder<std::string>;
+
+}  // namespace pulsar
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 6f2c211..b1a620a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -773,7 +773,8 @@ void ConsumerImpl::internalListener() {
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
         lastDequedMessageId_ = msg.getMessageId();
-        messageListener_(Consumer(get_shared_this_ptr()), msg);
+        Consumer consumer{get_shared_this_ptr()};
+        messageListener_(consumer, msg);
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
     }
diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc
index b33394e..ae56d10 100644
--- a/lib/MessageBuilder.cc
+++ b/lib/MessageBuilder.cc
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <assert.h>
 #include <pulsar/MessageBuilder.h>
 
 #include <memory>
@@ -155,4 +156,15 @@ MessageBuilder& MessageBuilder::disableReplication(bool 
flag) {
     r.Swap(impl_->metadata.mutable_replicate_to());
     return *this;
 }
+
+const char* MessageBuilder::data() const {
+    assert(impl_->payload.data());
+    return impl_->payload.data();
+}
+
+size_t MessageBuilder::size() const {
+    assert(impl_->payload.data());
+    return impl_->payload.readableBytes();
+}
+
 }  // namespace pulsar
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index d9d3873..a013566 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -539,7 +539,8 @@ void MultiTopicsConsumerImpl::internalListener(Consumer 
consumer) {
     Message m;
     incomingMessages_.pop(m);
     try {
-        messageListener_(Consumer(get_shared_this_ptr()), m);
+        Consumer self{get_shared_this_ptr()};
+        messageListener_(self, m);
         messageProcessed(m);
     } catch (const std::exception& e) {
         LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << 
e.what());
diff --git a/tests/TypedMessageTest.cc b/tests/TypedMessageTest.cc
new file mode 100644
index 0000000..9a61bfe
--- /dev/null
+++ b/tests/TypedMessageTest.cc
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/MessageBuilder.h>
+#include <pulsar/TypedMessage.h>
+#include <pulsar/TypedMessageBuilder.h>
+
+#include <mutex>
+#include <vector>
+
+#include "WaitUtils.h"
+#include "lib/Latch.h"
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+
+extern std::string unique_str();
+
+using IntMessageBuilder = TypedMessageBuilder<int>;
+static auto intEncoder = [](int x) { return std::to_string(x); };
+static auto intDecoder = [](const char* data, size_t size) { return 
std::stoi(std::string(data, size)); };
+
+TEST(TypedMessageTest, testReceive) {
+    const auto topic = "typed-message-test-receive-" + unique_str();
+    Client client(lookupUrl);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    std::vector<MessageId> messageIds;
+    constexpr int numMessages = 100;
+
+    for (int i = 0; i < numMessages; i++) {
+        MessageId messageId;
+        auto msg = TypedMessageBuilder<int>{intEncoder}.setValue(i).build();
+        ASSERT_EQ(ResultOk, producer.send(msg, messageId));
+        messageIds.emplace_back(messageId);
+    }
+    for (int i = 0; i < numMessages; i++) {
+        TypedMessage<int> msg;
+        // ensure the thread safety for `msg`, which could be modified in the 
callback of `receiveAsync`
+        std::mutex msgMutex;
+        if (i % 3 == 0) {
+            ASSERT_EQ(ResultOk, consumer.receive(msg, intDecoder));
+        } else if (i % 3 == 1) {
+            ASSERT_EQ(ResultOk, consumer.receive(msg, 3000, intDecoder));
+        } else {
+            Latch latch{1};
+            consumer.receiveAsync(
+                [&latch, &msg, &msgMutex](Result result, const 
TypedMessage<int>& receivedMsg) {
+                    ASSERT_EQ(ResultOk, result);
+                    {
+                        std::lock_guard<std::mutex> lock{msgMutex};
+                        msg = TypedMessage<int>{receivedMsg, intDecoder};
+                    }
+                    latch.countdown();
+                });
+            ASSERT_TRUE(latch.wait(std::chrono::seconds(1)));
+        }
+
+        std::lock_guard<std::mutex> lock{msgMutex};
+        ASSERT_EQ(msg.getValue(), i);
+        ASSERT_EQ(msg.getMessageId(), messageIds[i]);
+    }
+
+    client.close();
+}
+
+TEST(TypedMessageTest, testListener) {
+    const auto topic = "typed-message-test-listener-" + unique_str();
+    Client client(lookupUrl);
+
+    ConsumerConfiguration conf;
+    std::vector<int> values;
+    std::mutex valuesMutex;
+    conf.setTypedMessageListener<int>(
+        [&values, &valuesMutex](Consumer& consumer, const TypedMessage<int>& 
msg) {
+            std::lock_guard<std::mutex> lock{valuesMutex};
+            values.emplace_back(msg.getValue());
+        },
+        intDecoder);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    constexpr int numMessages = 100;
+    for (int i = 0; i < numMessages; i++) {
+        auto msg = TypedMessageBuilder<int>{intEncoder}.setValue(i).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    waitUntil(std::chrono::seconds(3), [&values, &valuesMutex] {
+        std::lock_guard<std::mutex> lock{valuesMutex};
+        return values.size() == numMessages;
+    });
+    std::lock_guard<std::mutex> lock{valuesMutex};
+    ASSERT_EQ(values.size(), numMessages);
+    for (int i = 0; i < numMessages; i++) {
+        ASSERT_EQ(values[i], i);
+    }
+}
+
+TEST(TypedMessageTest, testValidate) {
+    auto encoder = [](int x) { return std::to_string(x); };
+    auto validator = [](const char* data, size_t size) {
+        if (size > 3) {
+            throw std::runtime_error(std::to_string(size));
+        }
+    };
+    IntMessageBuilder intMessageBuilder{encoder, validator};
+    intMessageBuilder.setValue(1);
+    ASSERT_THROW(intMessageBuilder.setValue(1234), std::runtime_error);
+    intMessageBuilder.setValue(123);
+
+    BytesMessageBuilder{validator}.setContent("123");
+}

Reply via email to