This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new c671ac0  [feat] Support KeyValue Schema. (#22)
c671ac0 is described below

commit c671ac074cc60297aefb71ac57e5f650ad8f2f84
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Oct 26 17:47:53 2022 +0800

    [feat] Support KeyValue Schema. (#22)
    
    ### Motivation
    
    C++ client Support KeyValue Schema. For key and value schema, just only 
support `AVRO` and `JSON` type(consistent with java client).
    
    This PR has been reviewed in pulsar repo: 
https://github.com/apache/pulsar/pull/17125
    
    ### Modifications
    
    - A new constructor is added in `SchemaInfo` to combine key and value 
schemas.
    - Add a new `KeyValue` class, to help users merge and parse key and value 
data.
---
 .gitignore                               |   4 +-
 examples/CMakeLists.txt                  |  51 ++++++++------
 examples/SampleKeyValueSchemaConsumer.cc |  62 +++++++++++++++++
 examples/SampleKeyValueSchemaProducer.cc |  61 ++++++++++++++++
 include/pulsar/KeyValue.h                |  84 ++++++++++++++++++++++
 include/pulsar/Message.h                 |   8 +++
 include/pulsar/MessageBuilder.h          |   8 +++
 include/pulsar/Schema.h                  |  32 +++++++++
 lib/Commands.cc                          |   3 +
 lib/ConsumerImpl.cc                      |   4 ++
 lib/KeyValue.cc                          |  38 ++++++++++
 lib/KeyValueImpl.cc                      |  78 +++++++++++++++++++++
 lib/KeyValueImpl.h                       |  50 ++++++++++++++
 lib/Message.cc                           |   3 +
 lib/MessageBuilder.cc                    |   7 +-
 lib/MessageImpl.cc                       |  35 ++++++++++
 lib/MessageImpl.h                        |   5 ++
 lib/ProducerImpl.cc                      |   2 +
 lib/Schema.cc                            |  81 ++++++++++++++++++++++
 tests/KeyValueImplTest.cc                | 115 +++++++++++++++++++++++++++++++
 tests/KeyValueSchemaTest.cc              |  89 ++++++++++++++++++++++++
 tests/MessageTest.cc                     |  48 ++++++++++++-
 tests/SchemaTest.cc                      |  51 +++++++++++++-
 23 files changed, 894 insertions(+), 25 deletions(-)

diff --git a/.gitignore b/.gitignore
index 46092dc..4fd7cf1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,6 +45,8 @@ apache-pulsar-client-cpp-*.tar.gz
 /examples/SampleConsumerListener
 /examples/SampleConsumerListenerCApi
 /examples/SampleReaderCApi
+/examples/SampleKeyValueSchemaConsumer
+/examples/SampleKeyValueSchemaProducer
 /examples/SampleFileLogger
 /tests/main
 /tests/pulsar-tests
@@ -98,4 +100,4 @@ vcpkg_installed/
 *.rej
 .tests-container-id.txt
 Testing
-.test-token.txt
\ No newline at end of file
+.test-token.txt
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 72422d2..e84fbbb 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -60,25 +60,36 @@ set(SAMPLE_CONSUMER_LISTENER_C_SOURCES
 set(SAMPLE_READER_C_SOURCES
         SampleReaderCApi.c
 )
+set(SAMPLE_KEY_VALUE_SCHEMA_CONSUMER
+        SampleKeyValueSchemaConsumer.cc
+)
+
+set(SAMPLE_KEY_VALUE_SCHEMA_PRODUCER
+        SampleKeyValueSchemaProducer.cc
+)
 
-add_executable(SampleAsyncProducer        ${SAMPLE_ASYNC_PRODUCER_SOURCES})
-add_executable(SampleConsumer             ${SAMPLE_CONSUMER_SOURCES})
-add_executable(SampleConsumerListener     ${SAMPLE_CONSUMER_LISTENER_SOURCES})
-add_executable(SampleProducer             ${SAMPLE_PRODUCER_SOURCES})
-add_executable(SampleFileLogger           ${SAMPLE_FILE_LOGGER_SOURCES})
-add_executable(SampleProducerCApi         ${SAMPLE_PRODUCER_C_SOURCES})
-add_executable(SampleConsumerCApi         ${SAMPLE_CONSUMER_C_SOURCES})
-add_executable(SampleAsyncConsumerCApi    
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
-add_executable(SampleConsumerListenerCApi 
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
-add_executable(SampleReaderCApi           ${SAMPLE_READER_C_SOURCES})
+add_executable(SampleAsyncProducer                    
${SAMPLE_ASYNC_PRODUCER_SOURCES})
+add_executable(SampleConsumer                         
${SAMPLE_CONSUMER_SOURCES})
+add_executable(SampleConsumerListener                 
${SAMPLE_CONSUMER_LISTENER_SOURCES})
+add_executable(SampleProducer                         
${SAMPLE_PRODUCER_SOURCES})
+add_executable(SampleFileLogger                       
${SAMPLE_FILE_LOGGER_SOURCES})
+add_executable(SampleProducerCApi                     
${SAMPLE_PRODUCER_C_SOURCES})
+add_executable(SampleConsumerCApi                     
${SAMPLE_CONSUMER_C_SOURCES})
+add_executable(SampleAsyncConsumerCApi                
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
+add_executable(SampleConsumerListenerCApi             
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
+add_executable(SampleReaderCApi                       
${SAMPLE_READER_C_SOURCES})
+add_executable(SampleKeyValueSchemaConsumer           
${SAMPLE_KEY_VALUE_SCHEMA_CONSUMER})
+add_executable(SampleKeyValueSchemaProducer           
${SAMPLE_KEY_VALUE_SCHEMA_PRODUCER})
 
-target_link_libraries(SampleAsyncProducer        ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumer             ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumerListener     ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleProducer             ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleFileLogger           ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleProducerCApi         ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumerCApi         ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleAsyncConsumerCApi    ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleReaderCApi           ${CLIENT_LIBS} pulsarShared)
+target_link_libraries(SampleAsyncProducer              ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleConsumer                   ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleConsumerListener           ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleProducer                   ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleFileLogger                 ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleProducerCApi               ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleConsumerCApi               ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleAsyncConsumerCApi          ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleConsumerListenerCApi       ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleReaderCApi                 ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleKeyValueSchemaConsumer     ${CLIENT_LIBS} 
pulsarShared)
+target_link_libraries(SampleKeyValueSchemaProducer     ${CLIENT_LIBS} 
pulsarShared)
diff --git a/examples/SampleKeyValueSchemaConsumer.cc 
b/examples/SampleKeyValueSchemaConsumer.cc
new file mode 100644
index 0000000..b5ff1b5
--- /dev/null
+++ b/examples/SampleKeyValueSchemaConsumer.cc
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/LogUtils.h>
+#include <pulsar/Client.h>
+
+#include <iostream>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+    ConsumerConfiguration consumerConfiguration;
+    consumerConfiguration.setSchema(keyValueSchema);
+
+    Consumer consumer;
+    Result result = client.subscribe("persistent://public/default/kv-schema", 
"consumer-1",
+                                     consumerConfiguration, consumer);
+    if (result != ResultOk) {
+        LOG_ERROR("Failed to subscribe: " << result);
+        return -1;
+    }
+
+    LOG_INFO("Start receive message.")
+
+    Message msg;
+    while (true) {
+        consumer.receive(msg);
+        LOG_INFO("Received: " << msg << "  with payload '" << 
msg.getDataAsString() << "'");
+        LOG_INFO("Received: " << msg << "  with partitionKey '" << 
msg.getPartitionKey() << "'");
+        KeyValue keyValue = msg.getKeyValueData();
+        LOG_INFO("Received: " << msg << "  with key '" << keyValue.getKey() << 
"'");
+        LOG_INFO("Received: " << msg << "  with value '" << 
keyValue.getValueAsString() << "'");
+        consumer.acknowledge(msg);
+    }
+
+    client.close();
+}
diff --git a/examples/SampleKeyValueSchemaProducer.cc 
b/examples/SampleKeyValueSchemaProducer.cc
new file mode 100644
index 0000000..e52cb35
--- /dev/null
+++ b/examples/SampleKeyValueSchemaProducer.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/LogUtils.h>
+#include <pulsar/Client.h>
+
+#include <iostream>
+#include <thread>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+    LOG_INFO("KeyValue schema content: " << keyValueSchema.getSchema());
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setSchema(keyValueSchema);
+
+    Producer producer;
+    Result result =
+        client.createProducer("persistent://public/default/kv-schema", 
producerConfiguration, producer);
+    if (result != ResultOk) {
+        LOG_ERROR("Error creating producer: " << result);
+        return -1;
+    }
+
+    std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
+
+    KeyValue keyValue(std::move(jsonData), std::move(jsonData));
+
+    Message msg = MessageBuilder().setContent(keyValue).setProperty("x", 
"1").build();
+    result = producer.send(msg);
+    if (result == ResultOk) {
+        LOG_INFO("send message ok");
+    }
+    client.close();
+}
diff --git a/include/pulsar/KeyValue.h b/include/pulsar/KeyValue.h
new file mode 100644
index 0000000..2ccb26c
--- /dev/null
+++ b/include/pulsar/KeyValue.h
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef KEY_VALUE_HPP_
+#define KEY_VALUE_HPP_
+
+#include <memory>
+#include <string>
+
+#include "Schema.h"
+#include "defines.h"
+
+namespace pulsar {
+
+class KeyValueImpl;
+
+/**
+ * Use to when the user uses key value schema.
+ */
+class PULSAR_PUBLIC KeyValue {
+   public:
+    /**
+     * Constructor key value, according to keyValueEncodingType, whether key 
and value be encoded together.
+     *
+     * @param key  key data.
+     * @param value value data.
+     * @param keyValueEncodingType key value encoding type.
+     */
+    KeyValue(std::string &&key, std::string &&value);
+
+    /**
+     * Get the key of KeyValue.
+     *
+     * @return character stream for key
+     */
+    std::string getKey() const;
+
+    /**
+     * Get the value of the KeyValue.
+     *
+     *
+     * @return the pointer to the KeyValue value
+     */
+    const void *getValue() const;
+
+    /**
+     * Get the value length of the keyValue.
+     *
+     * @return the length of the KeyValue value
+     */
+    size_t getValueLength() const;
+
+    /**
+     * Get string representation of the KeyValue value.
+     *
+     * @return the string representation of the KeyValue value
+     */
+    std::string getValueAsString() const;
+
+   private:
+    typedef std::shared_ptr<KeyValueImpl> KeyValueImplPtr;
+    KeyValue(KeyValueImplPtr keyValueImplPtr);
+    KeyValueImplPtr impl_;
+    friend class Message;
+    friend class MessageBuilder;
+};
+}  // namespace pulsar
+
+#endif /* KEY_VALUE_HPP_ */
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 0c4afc2..74427a2 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -25,6 +25,7 @@
 #include <memory>
 #include <string>
 
+#include "KeyValue.h"
 #include "MessageId.h"
 
 namespace pulsar {
@@ -92,6 +93,13 @@ class PULSAR_PUBLIC Message {
      */
     std::string getDataAsString() const;
 
+    /**
+     * Get key value message.
+     *
+     * @return key value message.
+     */
+    KeyValue getKeyValueData() const;
+
     /**
      * Get the unique message ID associated with this message.
      *
diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h
index 2b84d20..a668dd4 100644
--- a/include/pulsar/MessageBuilder.h
+++ b/include/pulsar/MessageBuilder.h
@@ -19,6 +19,7 @@
 #ifndef MESSAGE_BUILDER_H
 #define MESSAGE_BUILDER_H
 
+#include <pulsar/KeyValue.h>
 #include <pulsar/Message.h>
 #include <pulsar/defines.h>
 
@@ -60,6 +61,13 @@ class PULSAR_PUBLIC MessageBuilder {
      */
     MessageBuilder& setContent(std::string&& data);
 
+    /**
+     * Set the key value content of the message
+     *
+     * @param data the content of the key value.
+     */
+    MessageBuilder& setContent(const KeyValue& data);
+
     /**
      * Set content of the message to a buffer already allocated by the caller. 
No copies of
      * this buffer will be made. The caller is responsible to ensure the 
memory buffer is
diff --git a/include/pulsar/Schema.h b/include/pulsar/Schema.h
index ec0802e..ad64c0b 100644
--- a/include/pulsar/Schema.h
+++ b/include/pulsar/Schema.h
@@ -27,6 +27,27 @@
 
 namespace pulsar {
 
+/**
+ *  Encoding types of supported KeyValueSchema for Pulsar messages.
+ */
+enum class KeyValueEncodingType
+{
+    /**
+     * Key is stored as message key, while value is stored as message payload.
+     */
+    SEPARATED,
+
+    /**
+     * Key and value are stored as message payload.
+     */
+    INLINE
+};
+
+// Return string representation of result code
+PULSAR_PUBLIC const char *strEncodingType(pulsar::KeyValueEncodingType 
encodingType);
+
+PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(std::string 
encodingTypeStr);
+
 enum SchemaType
 {
     /**
@@ -143,6 +164,14 @@ class PULSAR_PUBLIC SchemaInfo {
     SchemaInfo(SchemaType schemaType, const std::string &name, const 
std::string &schema,
                const StringMap &properties = StringMap());
 
+    /**
+     * @param keySchema  the key schema.
+     * @param valueSchema  the value schema.
+     * @param keyValueEncodingType Encoding types of supported KeyValueSchema 
for Pulsar messages.
+     */
+    SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema,
+               const KeyValueEncodingType &keyValueEncodingType = 
KeyValueEncodingType::INLINE);
+
     /**
      * @return the schema type
      */
@@ -166,8 +195,11 @@ class PULSAR_PUBLIC SchemaInfo {
    private:
     typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
     SchemaInfoImplPtr impl_;
+    static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
 };
 
 }  // namespace pulsar
 
 PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType 
schemaType);
+
+PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, 
pulsar::KeyValueEncodingType encodingType);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 13febd0..69492c6 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -69,6 +69,7 @@ static inline bool isBuiltInSchema(SchemaType schemaType) {
         case AVRO:
         case PROTOBUF:
         case PROTOBUF_NATIVE:
+        case KEY_VALUE:
             return true;
 
         default:
@@ -90,6 +91,8 @@ static inline proto::Schema_Type getSchemaType(SchemaType 
type) {
             return proto::Schema_Type_Avro;
         case PROTOBUF_NATIVE:
             return proto::Schema_Type_ProtobufNative;
+        case KEY_VALUE:
+            return proto::Schema_Type_KeyValue;
         default:
             return proto::Schema_Type_None;
     }
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 155c5bf..19d5055 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -458,6 +458,9 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         Lock lock(mutex_);
         numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, 
msg.redelivery_count());
     } else {
+        // try convery key value data.
+        m.impl_->convertPayloadToKeyValue(config_.getSchema());
+
         const auto startMessageId = startMessageId_.get();
         if (isPersistent_ && startMessageId.is_present() &&
             m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
@@ -582,6 +585,7 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
         Message msg = 
Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
         msg.impl_->setRedeliveryCount(redeliveryCount);
         msg.impl_->setTopicName(batchedMessage.getTopicName());
+        msg.impl_->convertPayloadToKeyValue(config_.getSchema());
 
         if (startMessageId.is_present()) {
             const MessageId& msgId = msg.getMessageId();
diff --git a/lib/KeyValue.cc b/lib/KeyValue.cc
new file mode 100644
index 0000000..e031f52
--- /dev/null
+++ b/lib/KeyValue.cc
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/KeyValue.h>
+
+#include "KeyValueImpl.h"
+
+namespace pulsar {
+
+KeyValue::KeyValue(KeyValueImplPtr impl) : impl_(impl) {}
+
+KeyValue::KeyValue(std::string &&key, std::string &&value)
+    : impl_(std::make_shared<KeyValueImpl>(std::move(key), std::move(value))) 
{}
+
+std::string KeyValue::getKey() const { return impl_->getKey(); }
+
+const void *KeyValue::getValue() const { return impl_->getValue(); }
+
+size_t KeyValue::getValueLength() const { return impl_->getValueLength(); }
+
+std::string KeyValue::getValueAsString() const { return 
impl_->getValueAsString(); }
+
+}  // namespace pulsar
diff --git a/lib/KeyValueImpl.cc b/lib/KeyValueImpl.cc
new file mode 100644
index 0000000..79018d0
--- /dev/null
+++ b/lib/KeyValueImpl.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "KeyValueImpl.h"
+
+#include <pulsar/Schema.h>
+
+#include "SharedBuffer.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+KeyValueImpl::KeyValueImpl(const char *data, int length, KeyValueEncodingType 
keyValueEncodingType) {
+    if (keyValueEncodingType == KeyValueEncodingType::INLINE) {
+        SharedBuffer buffer = SharedBuffer::wrap(const_cast<char *>(data), 
length);
+        auto keySize = buffer.readUnsignedInt();
+        if (keySize != INVALID_SIZE) {
+            SharedBuffer keyContent = buffer.slice(0, keySize);
+            key_ = std::string(keyContent.data(), keySize);
+            buffer.consume(keySize);
+        }
+        auto valueSize = buffer.readUnsignedInt();
+        if (valueSize != INVALID_SIZE) {
+            valueBuffer_ = buffer.slice(0, valueSize);
+        }
+    } else {
+        valueBuffer_ = SharedBuffer::wrap(const_cast<char *>(data), length);
+    }
+}
+
+KeyValueImpl::KeyValueImpl(std::string &&key, std::string &&value)
+    : key_(std::move(key)), valueBuffer_(SharedBuffer::take(std::move(value))) 
{}
+
+SharedBuffer KeyValueImpl::getContent(KeyValueEncodingType 
keyValueEncodingType) {
+    if (keyValueEncodingType == KeyValueEncodingType::INLINE) {
+        auto keySize = key_.length();
+        auto valueSize = valueBuffer_.readableBytes();
+        auto buffSize = sizeof(keySize) + keySize + sizeof(valueSize) + 
valueSize;
+        SharedBuffer buffer = SharedBuffer::allocate(buffSize);
+        buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : keySize);
+        buffer.write(key_.c_str(), keySize);
+
+        buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : valueSize);
+        buffer.write(valueBuffer_.data(), valueSize);
+
+        return buffer;
+    } else {
+        return SharedBuffer::copyFrom(valueBuffer_, 
valueBuffer_.readableBytes());
+    }
+}
+
+std::string KeyValueImpl::getKey() const { return key_; }
+
+const void *KeyValueImpl::getValue() const { return valueBuffer_.data(); }
+
+size_t KeyValueImpl::getValueLength() const { return 
valueBuffer_.readableBytes(); }
+
+std::string KeyValueImpl::getValueAsString() const {
+    return std::string(valueBuffer_.data(), valueBuffer_.readableBytes());
+}
+
+}  // namespace pulsar
diff --git a/lib/KeyValueImpl.h b/lib/KeyValueImpl.h
new file mode 100644
index 0000000..00ed33d
--- /dev/null
+++ b/lib/KeyValueImpl.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef LIB_KEY_VALUEIMPL_H_
+#define LIB_KEY_VALUEIMPL_H_
+
+#include <pulsar/Message.h>
+
+#include "SharedBuffer.h"
+#include "Utils.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+class PULSAR_PUBLIC KeyValueImpl {
+   public:
+    KeyValueImpl();
+    KeyValueImpl(const char* data, int length, KeyValueEncodingType 
keyValueEncodingType);
+    KeyValueImpl(std::string&& key, std::string&& value);
+    std::string getKey() const;
+    const void* getValue() const;
+    size_t getValueLength() const;
+    std::string getValueAsString() const;
+    SharedBuffer getContent(KeyValueEncodingType keyValueEncodingType);
+
+   private:
+    std::string key_;
+    SharedBuffer valueBuffer_;
+    static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
+};
+
+} /* namespace pulsar */
+
+#endif /* LIB_COMMANDS_H_ */
diff --git a/lib/Message.cc b/lib/Message.cc
index cb7a75e..84f203f 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -22,6 +22,7 @@
 
 #include <iostream>
 
+#include "KeyValueImpl.h"
 #include "MessageImpl.h"
 #include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
@@ -190,6 +191,8 @@ uint64_t Message::getEventTimestamp() const { return impl_ 
? impl_->getEventTime
 
 bool Message::operator==(const Message& msg) const { return getMessageId() == 
msg.getMessageId(); }
 
+KeyValue Message::getKeyValueData() const { return 
KeyValue(impl_->keyValuePtr); }
+
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
Message::StringMap& map) {
     // Output at most 10 elements -- appropriate if used for logging.
     s << '{';
diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc
index 7d8d8cb..b33394e 100644
--- a/lib/MessageBuilder.cc
+++ b/lib/MessageBuilder.cc
@@ -20,9 +20,9 @@
 
 #include <memory>
 #include <stdexcept>
-#include <string>
 #include <utility>
 
+#include "KeyValueImpl.h"
 #include "LogUtils.h"
 #include "MessageImpl.h"
 #include "ObjectPool.h"
@@ -80,6 +80,11 @@ MessageBuilder& MessageBuilder::setContent(std::string&& 
data) {
     return *this;
 }
 
+MessageBuilder& MessageBuilder::setContent(const KeyValue& data) {
+    impl_->keyValuePtr = data.impl_;
+    return *this;
+}
+
 MessageBuilder& MessageBuilder::setProperty(const std::string& name, const 
std::string& value) {
     checkMetadata();
     proto::KeyValue* keyValue = proto::KeyValue().New();
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index 5d1edbf..63232e7 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -102,4 +102,39 @@ void MessageImpl::setSchemaVersion(const std::string& 
schemaVersion) { schemaVer
 
 const std::string& MessageImpl::getSchemaVersion() const { return 
metadata.schema_version(); }
 
+void MessageImpl::convertKeyValueToPayload(const pulsar::SchemaInfo& 
schemaInfo) {
+    if (schemaInfo.getSchemaType() != KEY_VALUE) {
+        // ignore not key_value schema.
+        return;
+    }
+    KeyValueEncodingType keyValueEncodingType = 
getKeyValueEncodingType(schemaInfo);
+    payload = keyValuePtr->getContent(keyValueEncodingType);
+    if (keyValueEncodingType == KeyValueEncodingType::SEPARATED) {
+        setPartitionKey(keyValuePtr->getKey());
+    }
+}
+
+void MessageImpl::convertPayloadToKeyValue(const pulsar::SchemaInfo& 
schemaInfo) {
+    if (schemaInfo.getSchemaType() != KEY_VALUE) {
+        // ignore not key_value schema.
+        return;
+    }
+    keyValuePtr =
+        std::make_shared<KeyValueImpl>(static_cast<const 
char*>(payload.data()), payload.readableBytes(),
+                                       getKeyValueEncodingType(schemaInfo));
+}
+
+KeyValueEncodingType MessageImpl::getKeyValueEncodingType(SchemaInfo 
schemaInfo) {
+    if (schemaInfo.getSchemaType() != KEY_VALUE) {
+        throw std::invalid_argument("Schema not key value type.");
+    }
+    const StringMap& properties = schemaInfo.getProperties();
+    auto data = properties.find("kv.encoding.type");
+    if (data == properties.end()) {
+        throw std::invalid_argument("Not found kv.encoding.type by 
properties");
+    } else {
+        return enumEncodingType(data->second);
+    }
+}
+
 }  // namespace pulsar
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 587b663..790a021 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -22,6 +22,7 @@
 #include <pulsar/Message.h>
 #include <pulsar/MessageId.h>
 
+#include "KeyValueImpl.h"
 #include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
 
@@ -40,6 +41,7 @@ class MessageImpl {
 
     proto::MessageMetadata metadata;
     SharedBuffer payload;
+    std::shared_ptr<KeyValueImpl> keyValuePtr;
     MessageId messageId;
     ClientConnection* cnx_;
     const std::string* topicName_;
@@ -72,6 +74,9 @@ class MessageImpl {
     bool hasSchemaVersion() const;
     const std::string& getSchemaVersion() const;
     void setSchemaVersion(const std::string& value);
+    void convertKeyValueToPayload(const SchemaInfo& schemaInfo);
+    void convertPayloadToKeyValue(const SchemaInfo& schemaInfo);
+    KeyValueEncodingType getKeyValueEncodingType(SchemaInfo schemaInfo);
 
     friend class PulsarWrapper;
     friend class MessageBuilder;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 1213fce..05ab13d 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -406,6 +406,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
         return;
     }
 
+    // Convert the payload before sending the message.
+    msg.impl_->convertKeyValueToPayload(conf_.getSchema());
     const auto& uncompressedPayload = msg.impl_->payload;
     const uint32_t uncompressedSize = uncompressedPayload.readableBytes();
     const auto result = canEnqueueRequest(uncompressedSize);
diff --git a/lib/Schema.cc b/lib/Schema.cc
index 17a301e..300c91d 100644
--- a/lib/Schema.cc
+++ b/lib/Schema.cc
@@ -19,16 +19,58 @@
 #include <pulsar/Schema.h>
 #include <pulsar/defines.h>
 
+#include <boost/property_tree/json_parser.hpp>
+#include <boost/property_tree/ptree.hpp>
 #include <iostream>
 #include <map>
 #include <memory>
 
+#include "SharedBuffer.h"
+using boost::property_tree::ptree;
+using boost::property_tree::read_json;
+using boost::property_tree::write_json;
+
 PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType 
schemaType) {
     return s << strSchemaType(schemaType);
 }
 
+PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, 
pulsar::KeyValueEncodingType encodingType) {
+    return s << strEncodingType(encodingType);
+}
+
 namespace pulsar {
 
+static const std::string KEY_SCHEMA_NAME = "key.schema.name";
+static const std::string KEY_SCHEMA_TYPE = "key.schema.type";
+static const std::string KEY_SCHEMA_PROPS = "key.schema.properties";
+static const std::string VALUE_SCHEMA_NAME = "value.schema.name";
+static const std::string VALUE_SCHEMA_TYPE = "value.schema.type";
+static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties";
+static const std::string KV_ENCODING_TYPE = "kv.encoding.type";
+
+PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) {
+    switch (encodingType) {
+        case KeyValueEncodingType::INLINE:
+            return "INLINE";
+        case KeyValueEncodingType::SEPARATED:
+            return "SEPARATED";
+    };
+    // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
+    // Schema and miss them in the switch above we would like to get notified. 
Adding
+    // return here to make the compiler happy.
+    return "UnknownSchemaType";
+}
+
+PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(std::string 
encodingTypeStr) {
+    if (encodingTypeStr == "INLINE") {
+        return KeyValueEncodingType::INLINE;
+    } else if (encodingTypeStr == "SEPARATED") {
+        return KeyValueEncodingType::SEPARATED;
+    } else {
+        throw std::invalid_argument("No match encoding type: " + 
encodingTypeStr);
+    }
+}
+
 PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType) {
     switch (schemaType) {
         case NONE:
@@ -90,6 +132,45 @@ SchemaInfo::SchemaInfo(SchemaType schemaType, const 
std::string &name, const std
                        const StringMap &properties)
     : impl_(std::make_shared<SchemaInfoImpl>(schemaType, name, schema, 
properties)) {}
 
+SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo 
&valueSchema,
+                       const KeyValueEncodingType &keyValueEncodingType) {
+    std::string keySchemaStr = keySchema.getSchema();
+    std::string valueSchemaStr = valueSchema.getSchema();
+    uint32_t keySize = keySchemaStr.size();
+    uint32_t valueSize = valueSchemaStr.size();
+
+    auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize;
+    SharedBuffer buffer = SharedBuffer::allocate(buffSize);
+    buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : 
static_cast<uint32_t>(keySize));
+    buffer.write(keySchemaStr.c_str(), static_cast<uint32_t>(keySize));
+    buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : 
static_cast<uint32_t>(valueSize));
+    buffer.write(valueSchemaStr.c_str(), static_cast<uint32_t>(valueSize));
+
+    auto writeJson = [](const StringMap &properties) {
+        ptree pt;
+        for (auto &entry : properties) {
+            pt.put(entry.first, entry.second);
+        }
+        std::ostringstream buf;
+        write_json(buf, pt, false);
+        auto s = buf.str();
+        s.pop_back();
+        return s;
+    };
+
+    StringMap properties;
+    properties.emplace(KEY_SCHEMA_NAME, keySchema.getName());
+    properties.emplace(KEY_SCHEMA_TYPE, 
strSchemaType(keySchema.getSchemaType()));
+    properties.emplace(KEY_SCHEMA_PROPS, writeJson(keySchema.getProperties()));
+    properties.emplace(VALUE_SCHEMA_NAME, valueSchema.getName());
+    properties.emplace(VALUE_SCHEMA_TYPE, 
strSchemaType(valueSchema.getSchemaType()));
+    properties.emplace(VALUE_SCHEMA_PROPS, 
writeJson(valueSchema.getProperties()));
+    properties.emplace(KV_ENCODING_TYPE, 
strEncodingType(keyValueEncodingType));
+
+    impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue", 
std::string(buffer.data(), buffSize),
+                                             properties);
+}
+
 SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
 
 const std::string &SchemaInfo::getName() const { return impl_->name_; }
diff --git a/tests/KeyValueImplTest.cc b/tests/KeyValueImplTest.cc
new file mode 100644
index 0000000..89770c4
--- /dev/null
+++ b/tests/KeyValueImplTest.cc
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <KeyValueImpl.h>
+#include <gtest/gtest.h>
+
+using namespace pulsar;
+
+TEST(KeyValueTest, testEncodeAndDeCode) {
+    const char* keyContent = "keyContent";
+    const char* valueContent = "valueContent";
+
+    {
+        // test inline encode
+        KeyValueImpl keyValue(keyContent, valueContent);
+        const SharedBuffer content = 
keyValue.getContent(KeyValueEncodingType::INLINE);
+        ASSERT_EQ(content.readableBytes(), 8 + strlen(keyContent) + 
strlen(valueContent));
+
+        // test inline decode
+        KeyValueImpl deCodeKeyValue(content.data(), content.readableBytes(), 
KeyValueEncodingType::INLINE);
+        const SharedBuffer deCodeContent = 
deCodeKeyValue.getContent(KeyValueEncodingType::INLINE);
+        ASSERT_EQ(deCodeKeyValue.getKey(), keyContent);
+        ASSERT_EQ(deCodeKeyValue.getValueAsString(), valueContent);
+        ASSERT_TRUE(std::string(deCodeContent.data(), 
deCodeContent.readableBytes()).compare(valueContent) !=
+                    0);
+    }
+
+    {
+        // test separated encode
+        KeyValueImpl sepKeyValue(keyContent, valueContent);
+        const SharedBuffer content = 
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+        ASSERT_EQ(sepKeyValue.getKey(), keyContent);
+        ASSERT_EQ(sepKeyValue.getValueAsString(), valueContent);
+        ASSERT_EQ(std::string(content.data(), content.readableBytes()), 
valueContent);
+
+        // test separated decode
+        KeyValueImpl sepDeKeyValue(content.data(), content.readableBytes(), 
KeyValueEncodingType::SEPARATED);
+        const SharedBuffer deCodeContent = 
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+        ASSERT_EQ(sepDeKeyValue.getKey(), "");
+        ASSERT_EQ(sepDeKeyValue.getValueAsString(), valueContent);
+        ASSERT_EQ(std::string(deCodeContent.data(), 
deCodeContent.readableBytes()), valueContent);
+    }
+}
+
+TEST(KeyValueTest, testKeyIsEmpty) {
+    const char* keyContent = "";
+    const char* valueContent = "valueContent";
+
+    {
+        // test inline encode
+        KeyValueImpl keyValue(keyContent, valueContent);
+        const SharedBuffer content = 
keyValue.getContent(KeyValueEncodingType::INLINE);
+        ASSERT_EQ(content.readableBytes(), 8 + strlen(keyContent) + 
strlen(valueContent));
+
+        // test inline decode
+        KeyValueImpl deCodeKeyValue(content.data(), content.readableBytes(), 
KeyValueEncodingType::INLINE);
+        const SharedBuffer deCodeContent = 
deCodeKeyValue.getContent(KeyValueEncodingType::INLINE);
+        ASSERT_EQ(deCodeKeyValue.getKey(), keyContent);
+        ASSERT_EQ(deCodeKeyValue.getValueAsString(), valueContent);
+        ASSERT_TRUE(std::string(deCodeContent.data(), 
deCodeContent.readableBytes()).compare(valueContent) !=
+                    0);
+    }
+
+    {
+        // test separated type
+        KeyValueImpl sepKeyValue(keyContent, valueContent);
+        const SharedBuffer content = 
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+        ASSERT_EQ(sepKeyValue.getKey(), keyContent);
+        ASSERT_EQ(sepKeyValue.getValueAsString(), valueContent);
+        ASSERT_EQ(std::string(content.data(), content.readableBytes()), 
valueContent);
+    }
+}
+
+TEST(KeyValueTest, testValueIsEmpty) {
+    const char* keyContent = "keyContent";
+    const char* valueContent = "";
+
+    {
+        // test inline encode
+        KeyValueImpl keyValue(keyContent, valueContent);
+        const SharedBuffer content = 
keyValue.getContent(KeyValueEncodingType::INLINE);
+        ASSERT_EQ(content.readableBytes(), 8 + strlen(keyContent) + 
strlen(valueContent));
+
+        // test inline decode
+        KeyValueImpl deCodeKeyValue(content.data(), content.readableBytes(), 
KeyValueEncodingType::INLINE);
+        const SharedBuffer deCodeContent = 
keyValue.getContent(KeyValueEncodingType::INLINE);
+        ASSERT_EQ(deCodeKeyValue.getKey(), keyContent);
+        ASSERT_EQ(deCodeKeyValue.getValueAsString(), valueContent);
+        ASSERT_NE(std::string(deCodeContent.data(), 
deCodeContent.readableBytes()), valueContent);
+    }
+
+    {
+        // test separated type
+        KeyValueImpl sepKeyValue(keyContent, valueContent);
+        const SharedBuffer content = 
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+        ASSERT_EQ(sepKeyValue.getKey(), keyContent);
+        ASSERT_EQ(sepKeyValue.getValueAsString(), valueContent);
+        ASSERT_EQ(std::string(content.data(), content.readableBytes()), 
valueContent);
+    }
+}
diff --git a/tests/KeyValueSchemaTest.cc b/tests/KeyValueSchemaTest.cc
new file mode 100644
index 0000000..02b55ee
--- /dev/null
+++ b/tests/KeyValueSchemaTest.cc
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include "lib/LogUtils.h"
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+class KeyValueSchemaTest : public 
::testing::TestWithParam<KeyValueEncodingType> {
+   public:
+    void TearDown() override { client.close(); }
+
+    void createProducer(const std::string& topic, Producer& producer) {
+        ProducerConfiguration configProducer;
+        configProducer.setSchema(getKeyValueSchema());
+        configProducer.setBatchingEnabled(false);
+        ASSERT_EQ(ResultOk, client.createProducer(topic, configProducer, 
producer));
+    }
+
+    void createConsumer(const std::string& topic, Consumer& consumer) {
+        ConsumerConfiguration configConsumer;
+        configConsumer.setSchema(getKeyValueSchema());
+        ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-kv", configConsumer, 
consumer));
+    }
+
+    SchemaInfo getKeyValueSchema() {
+        SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+        SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+        return SchemaInfo(keySchema, valueSchema, GetParam());
+    }
+
+   private:
+    Client client{lookupUrl};
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+};
+
+TEST_P(KeyValueSchemaTest, testKeyValueSchema) {
+    auto encodingType = GetParam();
+    const std::string topicName =
+        "testKeyValueSchema-" + std::string(strEncodingType(encodingType)) + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    createProducer(topicName, producer);
+    Consumer consumer;
+    createConsumer(topicName, consumer);
+
+    // Sending and receiving messages.
+    std::string keyData = "{\"re\":2.1,\"im\":1.23}";
+    std::string valueData = "{\"re\":2.1,\"im\":1.23}";
+    KeyValue keyValue((std::string(keyData)), std::string(valueData));
+    Message msg = MessageBuilder().setContent(keyValue).setProperty("x", 
"1").build();
+    ASSERT_EQ(ResultOk, producer.send(msg));
+
+    Message receiveMsg;
+    consumer.receive(receiveMsg);
+    KeyValue keyValueData = receiveMsg.getKeyValueData();
+
+    if (encodingType == pulsar::KeyValueEncodingType::INLINE) {
+        ASSERT_EQ(receiveMsg.getPartitionKey(), "");
+        ASSERT_EQ(keyValueData.getKey(), keyData);
+    } else {
+        ASSERT_EQ(receiveMsg.getPartitionKey(), keyData);
+        ASSERT_EQ(keyValueData.getKey(), "");
+    }
+    ASSERT_EQ(keyValueData.getValueAsString(), valueData);
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, KeyValueSchemaTest,
+                        ::testing::Values(KeyValueEncodingType::INLINE, 
KeyValueEncodingType::SEPARATED));
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 7e26431..3dfe221 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -22,7 +22,7 @@
 
 #include <string>
 
-#include "lib/LogUtils.h"
+#include "MessageImpl.h"
 
 using namespace pulsar;
 TEST(MessageTest, testMessageContents) {
@@ -101,3 +101,49 @@ TEST(MessageTest, testMessageBuilder) {
         ASSERT_EQ(msg.getData(), originalAddress);
     }
 }
+
+TEST(MessageTest, testMessageImplKeyValuePayloadCovert) {
+    const char* keyContent = "keyContent";
+    const char* valueContent = "valueContent";
+
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+
+    // test inline encoding type.
+    {
+        SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+        MessageImpl msgImpl;
+        std::shared_ptr<KeyValueImpl> keyValuePtr = 
std::make_shared<KeyValueImpl>(keyContent, valueContent);
+        msgImpl.keyValuePtr = keyValuePtr;
+        msgImpl.convertKeyValueToPayload(keyValueSchema);
+        ASSERT_EQ(msgImpl.payload.readableBytes(), 30);
+        ASSERT_EQ(msgImpl.getPartitionKey(), "");
+
+        MessageImpl deMsgImpl;
+        deMsgImpl.payload = msgImpl.payload;
+        deMsgImpl.convertPayloadToKeyValue(keyValueSchema);
+
+        ASSERT_EQ(deMsgImpl.keyValuePtr->getKey(), keyContent);
+        ASSERT_EQ(deMsgImpl.keyValuePtr->getValueAsString(), valueContent);
+    }
+
+    // test separated encoding type.
+    {
+        SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::SEPARATED);
+        MessageImpl msgImpl;
+        std::shared_ptr<KeyValueImpl> keyValuePtr = 
std::make_shared<KeyValueImpl>(keyContent, valueContent);
+        msgImpl.keyValuePtr = keyValuePtr;
+        msgImpl.convertKeyValueToPayload(keyValueSchema);
+        ASSERT_EQ(msgImpl.payload.readableBytes(), 12);
+        ASSERT_EQ(msgImpl.getPartitionKey(), keyContent);
+
+        MessageImpl deMsgImpl;
+        deMsgImpl.payload = msgImpl.payload;
+        deMsgImpl.convertPayloadToKeyValue(keyValueSchema);
+
+        ASSERT_EQ(deMsgImpl.keyValuePtr->getKey(), "");
+        ASSERT_EQ(deMsgImpl.keyValuePtr->getValueAsString(), valueContent);
+    }
+}
diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc
index f153652..cf0f8c7 100644
--- a/tests/SchemaTest.cc
+++ b/tests/SchemaTest.cc
@@ -19,13 +19,14 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include "SharedBuffer.h"
+
 using namespace pulsar;
 
 static std::string lookupUrl = "pulsar://localhost:6650";
 
 static const std::string exampleSchema =
-    "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-    
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+    
R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})";
 
 TEST(SchemaTest, testSchema) {
     ClientConfiguration config;
@@ -107,3 +108,49 @@ TEST(SchemaTest, testHasSchemaVersion) {
 
     client.close();
 }
+
+TEST(SchemaTest, testKeyValueSchema) {
+    SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
+    SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+    ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
+    ASSERT_EQ(keyValueSchema.getSchema().size(),
+              8 + keySchema.getSchema().size() + 
valueSchema.getSchema().size());
+}
+
+TEST(SchemaTest, testKeySchemaIsEmpty) {
+    SchemaInfo keySchema(SchemaType::AVRO, "String", "");
+    SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+    ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
+    ASSERT_EQ(keyValueSchema.getSchema().size(),
+              8 + keySchema.getSchema().size() + 
valueSchema.getSchema().size());
+
+    SharedBuffer buffer = 
SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
+                                             
keyValueSchema.getSchema().size());
+    int keySchemaSize = buffer.readUnsignedInt();
+    ASSERT_EQ(keySchemaSize, -1);
+    int valueSchemaSize = buffer.readUnsignedInt();
+    ASSERT_EQ(valueSchemaSize, valueSchema.getSchema().size());
+    std::string valueSchemaStr(buffer.slice(0, valueSchemaSize).data(), 
valueSchemaSize);
+    ASSERT_EQ(valueSchema.getSchema(), valueSchemaStr);
+}
+
+TEST(SchemaTest, testValueSchemaIsEmpty) {
+    SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
+    SchemaInfo valueSchema(SchemaType::AVRO, "String", "");
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+    ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
+    ASSERT_EQ(keyValueSchema.getSchema().size(),
+              8 + keySchema.getSchema().size() + 
valueSchema.getSchema().size());
+
+    SharedBuffer buffer = 
SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
+                                             
keyValueSchema.getSchema().size());
+    int keySchemaSize = buffer.readUnsignedInt();
+    ASSERT_EQ(keySchemaSize, keySchema.getSchema().size());
+    std::string keySchemaStr(buffer.slice(0, keySchemaSize).data(), 
keySchemaSize);
+    ASSERT_EQ(keySchemaStr, keySchema.getSchema());
+    buffer.consume(keySchemaSize);
+    int valueSchemaSize = buffer.readUnsignedInt();
+    ASSERT_EQ(valueSchemaSize, -1);
+}

Reply via email to