This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new c671ac0 [feat] Support KeyValue Schema. (#22)
c671ac0 is described below
commit c671ac074cc60297aefb71ac57e5f650ad8f2f84
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Oct 26 17:47:53 2022 +0800
[feat] Support KeyValue Schema. (#22)
### Motivation
C++ client Support KeyValue Schema. For key and value schema, just only
support `AVRO` and `JSON` type(consistent with java client).
This PR has been reviewed in pulsar repo:
https://github.com/apache/pulsar/pull/17125
### Modifications
- A new constructor is added in `SchemaInfo` to combine key and value
schemas.
- Add a new `KeyValue` class, to help users merge and parse key and value
data.
---
.gitignore | 4 +-
examples/CMakeLists.txt | 51 ++++++++------
examples/SampleKeyValueSchemaConsumer.cc | 62 +++++++++++++++++
examples/SampleKeyValueSchemaProducer.cc | 61 ++++++++++++++++
include/pulsar/KeyValue.h | 84 ++++++++++++++++++++++
include/pulsar/Message.h | 8 +++
include/pulsar/MessageBuilder.h | 8 +++
include/pulsar/Schema.h | 32 +++++++++
lib/Commands.cc | 3 +
lib/ConsumerImpl.cc | 4 ++
lib/KeyValue.cc | 38 ++++++++++
lib/KeyValueImpl.cc | 78 +++++++++++++++++++++
lib/KeyValueImpl.h | 50 ++++++++++++++
lib/Message.cc | 3 +
lib/MessageBuilder.cc | 7 +-
lib/MessageImpl.cc | 35 ++++++++++
lib/MessageImpl.h | 5 ++
lib/ProducerImpl.cc | 2 +
lib/Schema.cc | 81 ++++++++++++++++++++++
tests/KeyValueImplTest.cc | 115 +++++++++++++++++++++++++++++++
tests/KeyValueSchemaTest.cc | 89 ++++++++++++++++++++++++
tests/MessageTest.cc | 48 ++++++++++++-
tests/SchemaTest.cc | 51 +++++++++++++-
23 files changed, 894 insertions(+), 25 deletions(-)
diff --git a/.gitignore b/.gitignore
index 46092dc..4fd7cf1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,6 +45,8 @@ apache-pulsar-client-cpp-*.tar.gz
/examples/SampleConsumerListener
/examples/SampleConsumerListenerCApi
/examples/SampleReaderCApi
+/examples/SampleKeyValueSchemaConsumer
+/examples/SampleKeyValueSchemaProducer
/examples/SampleFileLogger
/tests/main
/tests/pulsar-tests
@@ -98,4 +100,4 @@ vcpkg_installed/
*.rej
.tests-container-id.txt
Testing
-.test-token.txt
\ No newline at end of file
+.test-token.txt
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 72422d2..e84fbbb 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -60,25 +60,36 @@ set(SAMPLE_CONSUMER_LISTENER_C_SOURCES
set(SAMPLE_READER_C_SOURCES
SampleReaderCApi.c
)
+set(SAMPLE_KEY_VALUE_SCHEMA_CONSUMER
+ SampleKeyValueSchemaConsumer.cc
+)
+
+set(SAMPLE_KEY_VALUE_SCHEMA_PRODUCER
+ SampleKeyValueSchemaProducer.cc
+)
-add_executable(SampleAsyncProducer ${SAMPLE_ASYNC_PRODUCER_SOURCES})
-add_executable(SampleConsumer ${SAMPLE_CONSUMER_SOURCES})
-add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
-add_executable(SampleProducer ${SAMPLE_PRODUCER_SOURCES})
-add_executable(SampleFileLogger ${SAMPLE_FILE_LOGGER_SOURCES})
-add_executable(SampleProducerCApi ${SAMPLE_PRODUCER_C_SOURCES})
-add_executable(SampleConsumerCApi ${SAMPLE_CONSUMER_C_SOURCES})
-add_executable(SampleAsyncConsumerCApi
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
-add_executable(SampleConsumerListenerCApi
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
-add_executable(SampleReaderCApi ${SAMPLE_READER_C_SOURCES})
+add_executable(SampleAsyncProducer
${SAMPLE_ASYNC_PRODUCER_SOURCES})
+add_executable(SampleConsumer
${SAMPLE_CONSUMER_SOURCES})
+add_executable(SampleConsumerListener
${SAMPLE_CONSUMER_LISTENER_SOURCES})
+add_executable(SampleProducer
${SAMPLE_PRODUCER_SOURCES})
+add_executable(SampleFileLogger
${SAMPLE_FILE_LOGGER_SOURCES})
+add_executable(SampleProducerCApi
${SAMPLE_PRODUCER_C_SOURCES})
+add_executable(SampleConsumerCApi
${SAMPLE_CONSUMER_C_SOURCES})
+add_executable(SampleAsyncConsumerCApi
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
+add_executable(SampleConsumerListenerCApi
${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
+add_executable(SampleReaderCApi
${SAMPLE_READER_C_SOURCES})
+add_executable(SampleKeyValueSchemaConsumer
${SAMPLE_KEY_VALUE_SCHEMA_CONSUMER})
+add_executable(SampleKeyValueSchemaProducer
${SAMPLE_KEY_VALUE_SCHEMA_PRODUCER})
-target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumer ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumerListener ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleProducer ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleFileLogger ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleProducerCApi ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumerCApi ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleAsyncConsumerCApi ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS} pulsarShared)
-target_link_libraries(SampleReaderCApi ${CLIENT_LIBS} pulsarShared)
+target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleConsumer ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleConsumerListener ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleProducer ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleFileLogger ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleProducerCApi ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleConsumerCApi ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleAsyncConsumerCApi ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleReaderCApi ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleKeyValueSchemaConsumer ${CLIENT_LIBS}
pulsarShared)
+target_link_libraries(SampleKeyValueSchemaProducer ${CLIENT_LIBS}
pulsarShared)
diff --git a/examples/SampleKeyValueSchemaConsumer.cc
b/examples/SampleKeyValueSchemaConsumer.cc
new file mode 100644
index 0000000..b5ff1b5
--- /dev/null
+++ b/examples/SampleKeyValueSchemaConsumer.cc
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/LogUtils.h>
+#include <pulsar/Client.h>
+
+#include <iostream>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+int main() {
+ Client client("pulsar://localhost:6650");
+
+ std::string jsonSchema =
+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+
+ SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+ SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::INLINE);
+ ConsumerConfiguration consumerConfiguration;
+ consumerConfiguration.setSchema(keyValueSchema);
+
+ Consumer consumer;
+ Result result = client.subscribe("persistent://public/default/kv-schema",
"consumer-1",
+ consumerConfiguration, consumer);
+ if (result != ResultOk) {
+ LOG_ERROR("Failed to subscribe: " << result);
+ return -1;
+ }
+
+ LOG_INFO("Start receive message.")
+
+ Message msg;
+ while (true) {
+ consumer.receive(msg);
+ LOG_INFO("Received: " << msg << " with payload '" <<
msg.getDataAsString() << "'");
+ LOG_INFO("Received: " << msg << " with partitionKey '" <<
msg.getPartitionKey() << "'");
+ KeyValue keyValue = msg.getKeyValueData();
+ LOG_INFO("Received: " << msg << " with key '" << keyValue.getKey() <<
"'");
+ LOG_INFO("Received: " << msg << " with value '" <<
keyValue.getValueAsString() << "'");
+ consumer.acknowledge(msg);
+ }
+
+ client.close();
+}
diff --git a/examples/SampleKeyValueSchemaProducer.cc
b/examples/SampleKeyValueSchemaProducer.cc
new file mode 100644
index 0000000..e52cb35
--- /dev/null
+++ b/examples/SampleKeyValueSchemaProducer.cc
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <lib/LogUtils.h>
+#include <pulsar/Client.h>
+
+#include <iostream>
+#include <thread>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+int main() {
+ Client client("pulsar://localhost:6650");
+
+ std::string jsonSchema =
+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+
+ SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+ SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::INLINE);
+ LOG_INFO("KeyValue schema content: " << keyValueSchema.getSchema());
+
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setSchema(keyValueSchema);
+
+ Producer producer;
+ Result result =
+ client.createProducer("persistent://public/default/kv-schema",
producerConfiguration, producer);
+ if (result != ResultOk) {
+ LOG_ERROR("Error creating producer: " << result);
+ return -1;
+ }
+
+ std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
+
+ KeyValue keyValue(std::move(jsonData), std::move(jsonData));
+
+ Message msg = MessageBuilder().setContent(keyValue).setProperty("x",
"1").build();
+ result = producer.send(msg);
+ if (result == ResultOk) {
+ LOG_INFO("send message ok");
+ }
+ client.close();
+}
diff --git a/include/pulsar/KeyValue.h b/include/pulsar/KeyValue.h
new file mode 100644
index 0000000..2ccb26c
--- /dev/null
+++ b/include/pulsar/KeyValue.h
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef KEY_VALUE_HPP_
+#define KEY_VALUE_HPP_
+
+#include <memory>
+#include <string>
+
+#include "Schema.h"
+#include "defines.h"
+
+namespace pulsar {
+
+class KeyValueImpl;
+
+/**
+ * Use to when the user uses key value schema.
+ */
+class PULSAR_PUBLIC KeyValue {
+ public:
+ /**
+ * Constructor key value, according to keyValueEncodingType, whether key
and value be encoded together.
+ *
+ * @param key key data.
+ * @param value value data.
+ * @param keyValueEncodingType key value encoding type.
+ */
+ KeyValue(std::string &&key, std::string &&value);
+
+ /**
+ * Get the key of KeyValue.
+ *
+ * @return character stream for key
+ */
+ std::string getKey() const;
+
+ /**
+ * Get the value of the KeyValue.
+ *
+ *
+ * @return the pointer to the KeyValue value
+ */
+ const void *getValue() const;
+
+ /**
+ * Get the value length of the keyValue.
+ *
+ * @return the length of the KeyValue value
+ */
+ size_t getValueLength() const;
+
+ /**
+ * Get string representation of the KeyValue value.
+ *
+ * @return the string representation of the KeyValue value
+ */
+ std::string getValueAsString() const;
+
+ private:
+ typedef std::shared_ptr<KeyValueImpl> KeyValueImplPtr;
+ KeyValue(KeyValueImplPtr keyValueImplPtr);
+ KeyValueImplPtr impl_;
+ friend class Message;
+ friend class MessageBuilder;
+};
+} // namespace pulsar
+
+#endif /* KEY_VALUE_HPP_ */
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 0c4afc2..74427a2 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -25,6 +25,7 @@
#include <memory>
#include <string>
+#include "KeyValue.h"
#include "MessageId.h"
namespace pulsar {
@@ -92,6 +93,13 @@ class PULSAR_PUBLIC Message {
*/
std::string getDataAsString() const;
+ /**
+ * Get key value message.
+ *
+ * @return key value message.
+ */
+ KeyValue getKeyValueData() const;
+
/**
* Get the unique message ID associated with this message.
*
diff --git a/include/pulsar/MessageBuilder.h b/include/pulsar/MessageBuilder.h
index 2b84d20..a668dd4 100644
--- a/include/pulsar/MessageBuilder.h
+++ b/include/pulsar/MessageBuilder.h
@@ -19,6 +19,7 @@
#ifndef MESSAGE_BUILDER_H
#define MESSAGE_BUILDER_H
+#include <pulsar/KeyValue.h>
#include <pulsar/Message.h>
#include <pulsar/defines.h>
@@ -60,6 +61,13 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& setContent(std::string&& data);
+ /**
+ * Set the key value content of the message
+ *
+ * @param data the content of the key value.
+ */
+ MessageBuilder& setContent(const KeyValue& data);
+
/**
* Set content of the message to a buffer already allocated by the caller.
No copies of
* this buffer will be made. The caller is responsible to ensure the
memory buffer is
diff --git a/include/pulsar/Schema.h b/include/pulsar/Schema.h
index ec0802e..ad64c0b 100644
--- a/include/pulsar/Schema.h
+++ b/include/pulsar/Schema.h
@@ -27,6 +27,27 @@
namespace pulsar {
+/**
+ * Encoding types of supported KeyValueSchema for Pulsar messages.
+ */
+enum class KeyValueEncodingType
+{
+ /**
+ * Key is stored as message key, while value is stored as message payload.
+ */
+ SEPARATED,
+
+ /**
+ * Key and value are stored as message payload.
+ */
+ INLINE
+};
+
+// Return string representation of result code
+PULSAR_PUBLIC const char *strEncodingType(pulsar::KeyValueEncodingType
encodingType);
+
+PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(std::string
encodingTypeStr);
+
enum SchemaType
{
/**
@@ -143,6 +164,14 @@ class PULSAR_PUBLIC SchemaInfo {
SchemaInfo(SchemaType schemaType, const std::string &name, const
std::string &schema,
const StringMap &properties = StringMap());
+ /**
+ * @param keySchema the key schema.
+ * @param valueSchema the value schema.
+ * @param keyValueEncodingType Encoding types of supported KeyValueSchema
for Pulsar messages.
+ */
+ SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema,
+ const KeyValueEncodingType &keyValueEncodingType =
KeyValueEncodingType::INLINE);
+
/**
* @return the schema type
*/
@@ -166,8 +195,11 @@ class PULSAR_PUBLIC SchemaInfo {
private:
typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
SchemaInfoImplPtr impl_;
+ static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
};
} // namespace pulsar
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType
schemaType);
+
+PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s,
pulsar::KeyValueEncodingType encodingType);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 13febd0..69492c6 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -69,6 +69,7 @@ static inline bool isBuiltInSchema(SchemaType schemaType) {
case AVRO:
case PROTOBUF:
case PROTOBUF_NATIVE:
+ case KEY_VALUE:
return true;
default:
@@ -90,6 +91,8 @@ static inline proto::Schema_Type getSchemaType(SchemaType
type) {
return proto::Schema_Type_Avro;
case PROTOBUF_NATIVE:
return proto::Schema_Type_ProtobufNative;
+ case KEY_VALUE:
+ return proto::Schema_Type_KeyValue;
default:
return proto::Schema_Type_None;
}
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 155c5bf..19d5055 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -458,6 +458,9 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
Lock lock(mutex_);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m,
msg.redelivery_count());
} else {
+ // try convery key value data.
+ m.impl_->convertPayloadToKeyValue(config_.getSchema());
+
const auto startMessageId = startMessageId_.get();
if (isPersistent_ && startMessageId.is_present() &&
m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
@@ -582,6 +585,7 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
Message msg =
Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
msg.impl_->setRedeliveryCount(redeliveryCount);
msg.impl_->setTopicName(batchedMessage.getTopicName());
+ msg.impl_->convertPayloadToKeyValue(config_.getSchema());
if (startMessageId.is_present()) {
const MessageId& msgId = msg.getMessageId();
diff --git a/lib/KeyValue.cc b/lib/KeyValue.cc
new file mode 100644
index 0000000..e031f52
--- /dev/null
+++ b/lib/KeyValue.cc
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/KeyValue.h>
+
+#include "KeyValueImpl.h"
+
+namespace pulsar {
+
+KeyValue::KeyValue(KeyValueImplPtr impl) : impl_(impl) {}
+
+KeyValue::KeyValue(std::string &&key, std::string &&value)
+ : impl_(std::make_shared<KeyValueImpl>(std::move(key), std::move(value)))
{}
+
+std::string KeyValue::getKey() const { return impl_->getKey(); }
+
+const void *KeyValue::getValue() const { return impl_->getValue(); }
+
+size_t KeyValue::getValueLength() const { return impl_->getValueLength(); }
+
+std::string KeyValue::getValueAsString() const { return
impl_->getValueAsString(); }
+
+} // namespace pulsar
diff --git a/lib/KeyValueImpl.cc b/lib/KeyValueImpl.cc
new file mode 100644
index 0000000..79018d0
--- /dev/null
+++ b/lib/KeyValueImpl.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "KeyValueImpl.h"
+
+#include <pulsar/Schema.h>
+
+#include "SharedBuffer.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+KeyValueImpl::KeyValueImpl(const char *data, int length, KeyValueEncodingType
keyValueEncodingType) {
+ if (keyValueEncodingType == KeyValueEncodingType::INLINE) {
+ SharedBuffer buffer = SharedBuffer::wrap(const_cast<char *>(data),
length);
+ auto keySize = buffer.readUnsignedInt();
+ if (keySize != INVALID_SIZE) {
+ SharedBuffer keyContent = buffer.slice(0, keySize);
+ key_ = std::string(keyContent.data(), keySize);
+ buffer.consume(keySize);
+ }
+ auto valueSize = buffer.readUnsignedInt();
+ if (valueSize != INVALID_SIZE) {
+ valueBuffer_ = buffer.slice(0, valueSize);
+ }
+ } else {
+ valueBuffer_ = SharedBuffer::wrap(const_cast<char *>(data), length);
+ }
+}
+
+KeyValueImpl::KeyValueImpl(std::string &&key, std::string &&value)
+ : key_(std::move(key)), valueBuffer_(SharedBuffer::take(std::move(value)))
{}
+
+SharedBuffer KeyValueImpl::getContent(KeyValueEncodingType
keyValueEncodingType) {
+ if (keyValueEncodingType == KeyValueEncodingType::INLINE) {
+ auto keySize = key_.length();
+ auto valueSize = valueBuffer_.readableBytes();
+ auto buffSize = sizeof(keySize) + keySize + sizeof(valueSize) +
valueSize;
+ SharedBuffer buffer = SharedBuffer::allocate(buffSize);
+ buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE : keySize);
+ buffer.write(key_.c_str(), keySize);
+
+ buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE : valueSize);
+ buffer.write(valueBuffer_.data(), valueSize);
+
+ return buffer;
+ } else {
+ return SharedBuffer::copyFrom(valueBuffer_,
valueBuffer_.readableBytes());
+ }
+}
+
+std::string KeyValueImpl::getKey() const { return key_; }
+
+const void *KeyValueImpl::getValue() const { return valueBuffer_.data(); }
+
+size_t KeyValueImpl::getValueLength() const { return
valueBuffer_.readableBytes(); }
+
+std::string KeyValueImpl::getValueAsString() const {
+ return std::string(valueBuffer_.data(), valueBuffer_.readableBytes());
+}
+
+} // namespace pulsar
diff --git a/lib/KeyValueImpl.h b/lib/KeyValueImpl.h
new file mode 100644
index 0000000..00ed33d
--- /dev/null
+++ b/lib/KeyValueImpl.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef LIB_KEY_VALUEIMPL_H_
+#define LIB_KEY_VALUEIMPL_H_
+
+#include <pulsar/Message.h>
+
+#include "SharedBuffer.h"
+#include "Utils.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+class PULSAR_PUBLIC KeyValueImpl {
+ public:
+ KeyValueImpl();
+ KeyValueImpl(const char* data, int length, KeyValueEncodingType
keyValueEncodingType);
+ KeyValueImpl(std::string&& key, std::string&& value);
+ std::string getKey() const;
+ const void* getValue() const;
+ size_t getValueLength() const;
+ std::string getValueAsString() const;
+ SharedBuffer getContent(KeyValueEncodingType keyValueEncodingType);
+
+ private:
+ std::string key_;
+ SharedBuffer valueBuffer_;
+ static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
+};
+
+} /* namespace pulsar */
+
+#endif /* LIB_COMMANDS_H_ */
diff --git a/lib/Message.cc b/lib/Message.cc
index cb7a75e..84f203f 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -22,6 +22,7 @@
#include <iostream>
+#include "KeyValueImpl.h"
#include "MessageImpl.h"
#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
@@ -190,6 +191,8 @@ uint64_t Message::getEventTimestamp() const { return impl_
? impl_->getEventTime
bool Message::operator==(const Message& msg) const { return getMessageId() ==
msg.getMessageId(); }
+KeyValue Message::getKeyValueData() const { return
KeyValue(impl_->keyValuePtr); }
+
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const
Message::StringMap& map) {
// Output at most 10 elements -- appropriate if used for logging.
s << '{';
diff --git a/lib/MessageBuilder.cc b/lib/MessageBuilder.cc
index 7d8d8cb..b33394e 100644
--- a/lib/MessageBuilder.cc
+++ b/lib/MessageBuilder.cc
@@ -20,9 +20,9 @@
#include <memory>
#include <stdexcept>
-#include <string>
#include <utility>
+#include "KeyValueImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "ObjectPool.h"
@@ -80,6 +80,11 @@ MessageBuilder& MessageBuilder::setContent(std::string&&
data) {
return *this;
}
+MessageBuilder& MessageBuilder::setContent(const KeyValue& data) {
+ impl_->keyValuePtr = data.impl_;
+ return *this;
+}
+
MessageBuilder& MessageBuilder::setProperty(const std::string& name, const
std::string& value) {
checkMetadata();
proto::KeyValue* keyValue = proto::KeyValue().New();
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index 5d1edbf..63232e7 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -102,4 +102,39 @@ void MessageImpl::setSchemaVersion(const std::string&
schemaVersion) { schemaVer
const std::string& MessageImpl::getSchemaVersion() const { return
metadata.schema_version(); }
+void MessageImpl::convertKeyValueToPayload(const pulsar::SchemaInfo&
schemaInfo) {
+ if (schemaInfo.getSchemaType() != KEY_VALUE) {
+ // ignore not key_value schema.
+ return;
+ }
+ KeyValueEncodingType keyValueEncodingType =
getKeyValueEncodingType(schemaInfo);
+ payload = keyValuePtr->getContent(keyValueEncodingType);
+ if (keyValueEncodingType == KeyValueEncodingType::SEPARATED) {
+ setPartitionKey(keyValuePtr->getKey());
+ }
+}
+
+void MessageImpl::convertPayloadToKeyValue(const pulsar::SchemaInfo&
schemaInfo) {
+ if (schemaInfo.getSchemaType() != KEY_VALUE) {
+ // ignore not key_value schema.
+ return;
+ }
+ keyValuePtr =
+ std::make_shared<KeyValueImpl>(static_cast<const
char*>(payload.data()), payload.readableBytes(),
+ getKeyValueEncodingType(schemaInfo));
+}
+
+KeyValueEncodingType MessageImpl::getKeyValueEncodingType(SchemaInfo
schemaInfo) {
+ if (schemaInfo.getSchemaType() != KEY_VALUE) {
+ throw std::invalid_argument("Schema not key value type.");
+ }
+ const StringMap& properties = schemaInfo.getProperties();
+ auto data = properties.find("kv.encoding.type");
+ if (data == properties.end()) {
+ throw std::invalid_argument("Not found kv.encoding.type by
properties");
+ } else {
+ return enumEncodingType(data->second);
+ }
+}
+
} // namespace pulsar
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 587b663..790a021 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -22,6 +22,7 @@
#include <pulsar/Message.h>
#include <pulsar/MessageId.h>
+#include "KeyValueImpl.h"
#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
@@ -40,6 +41,7 @@ class MessageImpl {
proto::MessageMetadata metadata;
SharedBuffer payload;
+ std::shared_ptr<KeyValueImpl> keyValuePtr;
MessageId messageId;
ClientConnection* cnx_;
const std::string* topicName_;
@@ -72,6 +74,9 @@ class MessageImpl {
bool hasSchemaVersion() const;
const std::string& getSchemaVersion() const;
void setSchemaVersion(const std::string& value);
+ void convertKeyValueToPayload(const SchemaInfo& schemaInfo);
+ void convertPayloadToKeyValue(const SchemaInfo& schemaInfo);
+ KeyValueEncodingType getKeyValueEncodingType(SchemaInfo schemaInfo);
friend class PulsarWrapper;
friend class MessageBuilder;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 1213fce..05ab13d 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -406,6 +406,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, const SendCallba
return;
}
+ // Convert the payload before sending the message.
+ msg.impl_->convertKeyValueToPayload(conf_.getSchema());
const auto& uncompressedPayload = msg.impl_->payload;
const uint32_t uncompressedSize = uncompressedPayload.readableBytes();
const auto result = canEnqueueRequest(uncompressedSize);
diff --git a/lib/Schema.cc b/lib/Schema.cc
index 17a301e..300c91d 100644
--- a/lib/Schema.cc
+++ b/lib/Schema.cc
@@ -19,16 +19,58 @@
#include <pulsar/Schema.h>
#include <pulsar/defines.h>
+#include <boost/property_tree/json_parser.hpp>
+#include <boost/property_tree/ptree.hpp>
#include <iostream>
#include <map>
#include <memory>
+#include "SharedBuffer.h"
+using boost::property_tree::ptree;
+using boost::property_tree::read_json;
+using boost::property_tree::write_json;
+
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType
schemaType) {
return s << strSchemaType(schemaType);
}
+PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s,
pulsar::KeyValueEncodingType encodingType) {
+ return s << strEncodingType(encodingType);
+}
+
namespace pulsar {
+static const std::string KEY_SCHEMA_NAME = "key.schema.name";
+static const std::string KEY_SCHEMA_TYPE = "key.schema.type";
+static const std::string KEY_SCHEMA_PROPS = "key.schema.properties";
+static const std::string VALUE_SCHEMA_NAME = "value.schema.name";
+static const std::string VALUE_SCHEMA_TYPE = "value.schema.type";
+static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties";
+static const std::string KV_ENCODING_TYPE = "kv.encoding.type";
+
+PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) {
+ switch (encodingType) {
+ case KeyValueEncodingType::INLINE:
+ return "INLINE";
+ case KeyValueEncodingType::SEPARATED:
+ return "SEPARATED";
+ };
+ // NOTE : Do not add default case in the switch above. In future if we get
new cases for
+ // Schema and miss them in the switch above we would like to get notified.
Adding
+ // return here to make the compiler happy.
+ return "UnknownSchemaType";
+}
+
+PULSAR_PUBLIC KeyValueEncodingType enumEncodingType(std::string
encodingTypeStr) {
+ if (encodingTypeStr == "INLINE") {
+ return KeyValueEncodingType::INLINE;
+ } else if (encodingTypeStr == "SEPARATED") {
+ return KeyValueEncodingType::SEPARATED;
+ } else {
+ throw std::invalid_argument("No match encoding type: " +
encodingTypeStr);
+ }
+}
+
PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType) {
switch (schemaType) {
case NONE:
@@ -90,6 +132,45 @@ SchemaInfo::SchemaInfo(SchemaType schemaType, const
std::string &name, const std
const StringMap &properties)
: impl_(std::make_shared<SchemaInfoImpl>(schemaType, name, schema,
properties)) {}
+SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo
&valueSchema,
+ const KeyValueEncodingType &keyValueEncodingType) {
+ std::string keySchemaStr = keySchema.getSchema();
+ std::string valueSchemaStr = valueSchema.getSchema();
+ uint32_t keySize = keySchemaStr.size();
+ uint32_t valueSize = valueSchemaStr.size();
+
+ auto buffSize = sizeof keySize + keySize + sizeof valueSize + valueSize;
+ SharedBuffer buffer = SharedBuffer::allocate(buffSize);
+ buffer.writeUnsignedInt(keySize == 0 ? INVALID_SIZE :
static_cast<uint32_t>(keySize));
+ buffer.write(keySchemaStr.c_str(), static_cast<uint32_t>(keySize));
+ buffer.writeUnsignedInt(valueSize == 0 ? INVALID_SIZE :
static_cast<uint32_t>(valueSize));
+ buffer.write(valueSchemaStr.c_str(), static_cast<uint32_t>(valueSize));
+
+ auto writeJson = [](const StringMap &properties) {
+ ptree pt;
+ for (auto &entry : properties) {
+ pt.put(entry.first, entry.second);
+ }
+ std::ostringstream buf;
+ write_json(buf, pt, false);
+ auto s = buf.str();
+ s.pop_back();
+ return s;
+ };
+
+ StringMap properties;
+ properties.emplace(KEY_SCHEMA_NAME, keySchema.getName());
+ properties.emplace(KEY_SCHEMA_TYPE,
strSchemaType(keySchema.getSchemaType()));
+ properties.emplace(KEY_SCHEMA_PROPS, writeJson(keySchema.getProperties()));
+ properties.emplace(VALUE_SCHEMA_NAME, valueSchema.getName());
+ properties.emplace(VALUE_SCHEMA_TYPE,
strSchemaType(valueSchema.getSchemaType()));
+ properties.emplace(VALUE_SCHEMA_PROPS,
writeJson(valueSchema.getProperties()));
+ properties.emplace(KV_ENCODING_TYPE,
strEncodingType(keyValueEncodingType));
+
+ impl_ = std::make_shared<SchemaInfoImpl>(KEY_VALUE, "KeyValue",
std::string(buffer.data(), buffSize),
+ properties);
+}
+
SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
const std::string &SchemaInfo::getName() const { return impl_->name_; }
diff --git a/tests/KeyValueImplTest.cc b/tests/KeyValueImplTest.cc
new file mode 100644
index 0000000..89770c4
--- /dev/null
+++ b/tests/KeyValueImplTest.cc
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <KeyValueImpl.h>
+#include <gtest/gtest.h>
+
+using namespace pulsar;
+
+TEST(KeyValueTest, testEncodeAndDeCode) {
+ const char* keyContent = "keyContent";
+ const char* valueContent = "valueContent";
+
+ {
+ // test inline encode
+ KeyValueImpl keyValue(keyContent, valueContent);
+ const SharedBuffer content =
keyValue.getContent(KeyValueEncodingType::INLINE);
+ ASSERT_EQ(content.readableBytes(), 8 + strlen(keyContent) +
strlen(valueContent));
+
+ // test inline decode
+ KeyValueImpl deCodeKeyValue(content.data(), content.readableBytes(),
KeyValueEncodingType::INLINE);
+ const SharedBuffer deCodeContent =
deCodeKeyValue.getContent(KeyValueEncodingType::INLINE);
+ ASSERT_EQ(deCodeKeyValue.getKey(), keyContent);
+ ASSERT_EQ(deCodeKeyValue.getValueAsString(), valueContent);
+ ASSERT_TRUE(std::string(deCodeContent.data(),
deCodeContent.readableBytes()).compare(valueContent) !=
+ 0);
+ }
+
+ {
+ // test separated encode
+ KeyValueImpl sepKeyValue(keyContent, valueContent);
+ const SharedBuffer content =
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+ ASSERT_EQ(sepKeyValue.getKey(), keyContent);
+ ASSERT_EQ(sepKeyValue.getValueAsString(), valueContent);
+ ASSERT_EQ(std::string(content.data(), content.readableBytes()),
valueContent);
+
+ // test separated decode
+ KeyValueImpl sepDeKeyValue(content.data(), content.readableBytes(),
KeyValueEncodingType::SEPARATED);
+ const SharedBuffer deCodeContent =
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+ ASSERT_EQ(sepDeKeyValue.getKey(), "");
+ ASSERT_EQ(sepDeKeyValue.getValueAsString(), valueContent);
+ ASSERT_EQ(std::string(deCodeContent.data(),
deCodeContent.readableBytes()), valueContent);
+ }
+}
+
+TEST(KeyValueTest, testKeyIsEmpty) {
+ const char* keyContent = "";
+ const char* valueContent = "valueContent";
+
+ {
+ // test inline encode
+ KeyValueImpl keyValue(keyContent, valueContent);
+ const SharedBuffer content =
keyValue.getContent(KeyValueEncodingType::INLINE);
+ ASSERT_EQ(content.readableBytes(), 8 + strlen(keyContent) +
strlen(valueContent));
+
+ // test inline decode
+ KeyValueImpl deCodeKeyValue(content.data(), content.readableBytes(),
KeyValueEncodingType::INLINE);
+ const SharedBuffer deCodeContent =
deCodeKeyValue.getContent(KeyValueEncodingType::INLINE);
+ ASSERT_EQ(deCodeKeyValue.getKey(), keyContent);
+ ASSERT_EQ(deCodeKeyValue.getValueAsString(), valueContent);
+ ASSERT_TRUE(std::string(deCodeContent.data(),
deCodeContent.readableBytes()).compare(valueContent) !=
+ 0);
+ }
+
+ {
+ // test separated type
+ KeyValueImpl sepKeyValue(keyContent, valueContent);
+ const SharedBuffer content =
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+ ASSERT_EQ(sepKeyValue.getKey(), keyContent);
+ ASSERT_EQ(sepKeyValue.getValueAsString(), valueContent);
+ ASSERT_EQ(std::string(content.data(), content.readableBytes()),
valueContent);
+ }
+}
+
+TEST(KeyValueTest, testValueIsEmpty) {
+ const char* keyContent = "keyContent";
+ const char* valueContent = "";
+
+ {
+ // test inline encode
+ KeyValueImpl keyValue(keyContent, valueContent);
+ const SharedBuffer content =
keyValue.getContent(KeyValueEncodingType::INLINE);
+ ASSERT_EQ(content.readableBytes(), 8 + strlen(keyContent) +
strlen(valueContent));
+
+ // test inline decode
+ KeyValueImpl deCodeKeyValue(content.data(), content.readableBytes(),
KeyValueEncodingType::INLINE);
+ const SharedBuffer deCodeContent =
keyValue.getContent(KeyValueEncodingType::INLINE);
+ ASSERT_EQ(deCodeKeyValue.getKey(), keyContent);
+ ASSERT_EQ(deCodeKeyValue.getValueAsString(), valueContent);
+ ASSERT_NE(std::string(deCodeContent.data(),
deCodeContent.readableBytes()), valueContent);
+ }
+
+ {
+ // test separated type
+ KeyValueImpl sepKeyValue(keyContent, valueContent);
+ const SharedBuffer content =
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
+ ASSERT_EQ(sepKeyValue.getKey(), keyContent);
+ ASSERT_EQ(sepKeyValue.getValueAsString(), valueContent);
+ ASSERT_EQ(std::string(content.data(), content.readableBytes()),
valueContent);
+ }
+}
diff --git a/tests/KeyValueSchemaTest.cc b/tests/KeyValueSchemaTest.cc
new file mode 100644
index 0000000..02b55ee
--- /dev/null
+++ b/tests/KeyValueSchemaTest.cc
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include "lib/LogUtils.h"
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+class KeyValueSchemaTest : public
::testing::TestWithParam<KeyValueEncodingType> {
+ public:
+ void TearDown() override { client.close(); }
+
+ void createProducer(const std::string& topic, Producer& producer) {
+ ProducerConfiguration configProducer;
+ configProducer.setSchema(getKeyValueSchema());
+ configProducer.setBatchingEnabled(false);
+ ASSERT_EQ(ResultOk, client.createProducer(topic, configProducer,
producer));
+ }
+
+ void createConsumer(const std::string& topic, Consumer& consumer) {
+ ConsumerConfiguration configConsumer;
+ configConsumer.setSchema(getKeyValueSchema());
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-kv", configConsumer,
consumer));
+ }
+
+ SchemaInfo getKeyValueSchema() {
+ SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+ SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+ return SchemaInfo(keySchema, valueSchema, GetParam());
+ }
+
+ private:
+ Client client{lookupUrl};
+ std::string jsonSchema =
+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+};
+
+TEST_P(KeyValueSchemaTest, testKeyValueSchema) {
+ auto encodingType = GetParam();
+ const std::string topicName =
+ "testKeyValueSchema-" + std::string(strEncodingType(encodingType)) +
std::to_string(time(nullptr));
+
+ Producer producer;
+ createProducer(topicName, producer);
+ Consumer consumer;
+ createConsumer(topicName, consumer);
+
+ // Sending and receiving messages.
+ std::string keyData = "{\"re\":2.1,\"im\":1.23}";
+ std::string valueData = "{\"re\":2.1,\"im\":1.23}";
+ KeyValue keyValue((std::string(keyData)), std::string(valueData));
+ Message msg = MessageBuilder().setContent(keyValue).setProperty("x",
"1").build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+
+ Message receiveMsg;
+ consumer.receive(receiveMsg);
+ KeyValue keyValueData = receiveMsg.getKeyValueData();
+
+ if (encodingType == pulsar::KeyValueEncodingType::INLINE) {
+ ASSERT_EQ(receiveMsg.getPartitionKey(), "");
+ ASSERT_EQ(keyValueData.getKey(), keyData);
+ } else {
+ ASSERT_EQ(receiveMsg.getPartitionKey(), keyData);
+ ASSERT_EQ(keyValueData.getKey(), "");
+ }
+ ASSERT_EQ(keyValueData.getValueAsString(), valueData);
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, KeyValueSchemaTest,
+ ::testing::Values(KeyValueEncodingType::INLINE,
KeyValueEncodingType::SEPARATED));
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 7e26431..3dfe221 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -22,7 +22,7 @@
#include <string>
-#include "lib/LogUtils.h"
+#include "MessageImpl.h"
using namespace pulsar;
TEST(MessageTest, testMessageContents) {
@@ -101,3 +101,49 @@ TEST(MessageTest, testMessageBuilder) {
ASSERT_EQ(msg.getData(), originalAddress);
}
}
+
+TEST(MessageTest, testMessageImplKeyValuePayloadCovert) {
+ const char* keyContent = "keyContent";
+ const char* valueContent = "valueContent";
+
+ std::string jsonSchema =
+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+ SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+ SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+
+ // test inline encoding type.
+ {
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::INLINE);
+ MessageImpl msgImpl;
+ std::shared_ptr<KeyValueImpl> keyValuePtr =
std::make_shared<KeyValueImpl>(keyContent, valueContent);
+ msgImpl.keyValuePtr = keyValuePtr;
+ msgImpl.convertKeyValueToPayload(keyValueSchema);
+ ASSERT_EQ(msgImpl.payload.readableBytes(), 30);
+ ASSERT_EQ(msgImpl.getPartitionKey(), "");
+
+ MessageImpl deMsgImpl;
+ deMsgImpl.payload = msgImpl.payload;
+ deMsgImpl.convertPayloadToKeyValue(keyValueSchema);
+
+ ASSERT_EQ(deMsgImpl.keyValuePtr->getKey(), keyContent);
+ ASSERT_EQ(deMsgImpl.keyValuePtr->getValueAsString(), valueContent);
+ }
+
+ // test separated encoding type.
+ {
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::SEPARATED);
+ MessageImpl msgImpl;
+ std::shared_ptr<KeyValueImpl> keyValuePtr =
std::make_shared<KeyValueImpl>(keyContent, valueContent);
+ msgImpl.keyValuePtr = keyValuePtr;
+ msgImpl.convertKeyValueToPayload(keyValueSchema);
+ ASSERT_EQ(msgImpl.payload.readableBytes(), 12);
+ ASSERT_EQ(msgImpl.getPartitionKey(), keyContent);
+
+ MessageImpl deMsgImpl;
+ deMsgImpl.payload = msgImpl.payload;
+ deMsgImpl.convertPayloadToKeyValue(keyValueSchema);
+
+ ASSERT_EQ(deMsgImpl.keyValuePtr->getKey(), "");
+ ASSERT_EQ(deMsgImpl.keyValuePtr->getValueAsString(), valueContent);
+ }
+}
diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc
index f153652..cf0f8c7 100644
--- a/tests/SchemaTest.cc
+++ b/tests/SchemaTest.cc
@@ -19,13 +19,14 @@
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include "SharedBuffer.h"
+
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
static const std::string exampleSchema =
- "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+
R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})";
TEST(SchemaTest, testSchema) {
ClientConfiguration config;
@@ -107,3 +108,49 @@ TEST(SchemaTest, testHasSchemaVersion) {
client.close();
}
+
+TEST(SchemaTest, testKeyValueSchema) {
+ SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
+ SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::INLINE);
+ ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
+ ASSERT_EQ(keyValueSchema.getSchema().size(),
+ 8 + keySchema.getSchema().size() +
valueSchema.getSchema().size());
+}
+
+TEST(SchemaTest, testKeySchemaIsEmpty) {
+ SchemaInfo keySchema(SchemaType::AVRO, "String", "");
+ SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::INLINE);
+ ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
+ ASSERT_EQ(keyValueSchema.getSchema().size(),
+ 8 + keySchema.getSchema().size() +
valueSchema.getSchema().size());
+
+ SharedBuffer buffer =
SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
+
keyValueSchema.getSchema().size());
+ int keySchemaSize = buffer.readUnsignedInt();
+ ASSERT_EQ(keySchemaSize, -1);
+ int valueSchemaSize = buffer.readUnsignedInt();
+ ASSERT_EQ(valueSchemaSize, valueSchema.getSchema().size());
+ std::string valueSchemaStr(buffer.slice(0, valueSchemaSize).data(),
valueSchemaSize);
+ ASSERT_EQ(valueSchema.getSchema(), valueSchemaStr);
+}
+
+TEST(SchemaTest, testValueSchemaIsEmpty) {
+ SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
+ SchemaInfo valueSchema(SchemaType::AVRO, "String", "");
+ SchemaInfo keyValueSchema(keySchema, valueSchema,
KeyValueEncodingType::INLINE);
+ ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
+ ASSERT_EQ(keyValueSchema.getSchema().size(),
+ 8 + keySchema.getSchema().size() +
valueSchema.getSchema().size());
+
+ SharedBuffer buffer =
SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
+
keyValueSchema.getSchema().size());
+ int keySchemaSize = buffer.readUnsignedInt();
+ ASSERT_EQ(keySchemaSize, keySchema.getSchema().size());
+ std::string keySchemaStr(buffer.slice(0, keySchemaSize).data(),
keySchemaSize);
+ ASSERT_EQ(keySchemaStr, keySchema.getSchema());
+ buffer.consume(keySchemaSize);
+ int valueSchemaSize = buffer.readUnsignedInt();
+ ASSERT_EQ(valueSchemaSize, -1);
+}