BewareMyPower commented on code in PR #17125:
URL: https://github.com/apache/pulsar/pull/17125#discussion_r976098568


##########
pulsar-client-cpp/include/pulsar/KeyValue.h:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef KEY_VALUE_HPP_
+#define KEY_VALUE_HPP_
+
+#include <map>

Review Comment:
   ```suggestion
   ```
   
   `std::map` is not used.



##########
pulsar-client-cpp/lib/MessageImpl.cc:
##########
@@ -20,7 +20,8 @@
 
 namespace pulsar {
 
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), 
topicName_(), redeliveryCount_() {}
+MessageImpl::MessageImpl()
+    : metadata(), payload(), keyValuePtr(), messageId(), cnx_(0), 
topicName_(), redeliveryCount_() {}

Review Comment:
   No need to initialize `keyValuePtr` here. The no args constructor will be 
applied to a C++ object by default. Actually the previous code is not good. 
Only numeric types, including raw pointers, need to be initialized.



##########
pulsar-client-cpp/include/pulsar/Schema.h:
##########
@@ -27,6 +27,27 @@
 
 namespace pulsar {
 
+/**
+ *  Encoding types of supported KeyValueSchema for Pulsar messages.
+ */
+enum class KeyValueEncodingType
+{
+    /**
+     * Key is stored as message key, while value is stored as message payload.
+     */
+    SEPARATED,
+
+    /**
+     * Key and value are stored as message payload.
+     */
+    INLINE
+};
+
+// Return string representation of result code
+PULSAR_PUBLIC const char *strEncodingType(pulsar::KeyValueEncodingType 
encodingType);
+
+PULSAR_PUBLIC const KeyValueEncodingType enumEncodingType(std::string 
encodingTypeStr);

Review Comment:
   ```suggestion
   PULSAR_PUBLIC const KeyValueEncodingType enumEncodingType(const std::string& 
encodingTypeStr);
   ```
   
   Though, I think the `enumEncodingType` method should not be exposed to 
users. In which case would users want to convert a string to the 
`KeyValueEncodingType`?



##########
pulsar-client-cpp/lib/MessageImpl.h:
##########
@@ -71,6 +73,9 @@ class MessageImpl {
     bool hasSchemaVersion() const;
     const std::string& getSchemaVersion() const;
     void setSchemaVersion(const std::string& value);
+    void convertKeyValueToPayload(SchemaInfo schemaInfo);
+    void convertPayloadToKeyValue(SchemaInfo schemaInfo);

Review Comment:
   Use `const SchemaInfo&` as the parameter 



##########
pulsar-client-cpp/tests/KeyValueImplTest.cc:
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <KeyValueImpl.h>
+
+using namespace pulsar;
+
+TEST(KeyValueTest, testEncodeAndDeCode) {
+    const std::string keyContent = "keyContent";
+    const std::string valueContent = "valueContent";
+
+    {
+        // test inline encode
+        KeyValueImpl keyValue((std::string(keyContent)), 
std::string(valueContent));

Review Comment:
   I found there are many places `KeyValueTest` you wrote something like
   
   ```
   std::string keyContent = "key";
   std::string valueContent = "value";
   KeyValueImpl keyValue(std::string(keyContent), std::string(valueContent));
   ```
   
   The code looks complicated and redundant. If you have two pre-defined 
`std::string` objects like:
   
   ```java
   std::string keyContent = "key";
   std::string valueContent = "value";
   ```
   
   You can just use `std::move` to move these strings into the arguments of 
`KeyValueImpl` constructor like:
   
   ```c++
   KeyValueImpl keyValue(std::move(keyContent), std::move(valueContent));
   ```
   
   Since const objects cannot be moved, you should remove the `const` qualifier 
on `keyContent` and `valueContent`.
   
   If you want to reuse two strings, you can declare `const char*` variables 
and construct the `std::string` implicitly.
   
   Here is an example that shows how I rewrote the `testValueIsEmpty`:
   
   ```c++
   TEST(KeyValueTest, testValueIsEmpty) {
       const char* keyContent = "keyContent";
       const char* valueContent = "";
   
       {
           // test inline encode
           KeyValueImpl keyValue(keyContent, valueContent);
           const SharedBuffer content = 
keyValue.getContent(KeyValueEncodingType::INLINE);
           ASSERT_EQ(content.readableBytes(), 8 + strlen(keyContent) + 
strlen(valueContent));
   
           // test inline decode
           KeyValueImpl deCodeKeyValue(content.data(), content.readableBytes(), 
KeyValueEncodingType::INLINE);
           const SharedBuffer deCodeContent = 
keyValue.getContent(KeyValueEncodingType::INLINE);
           ASSERT_EQ(deCodeKeyValue.getKey(), keyContent);
           ASSERT_EQ(deCodeKeyValue.getValueAsString(), valueContent);
           ASSERT_NE(std::string(deCodeContent.data(), 
deCodeContent.readableBytes()), valueContent);
       }
   
       {
           // test separated type
           KeyValueImpl sepKeyValue(keyContent, valueContent);
           const SharedBuffer content = 
sepKeyValue.getContent(KeyValueEncodingType::SEPARATED);
           ASSERT_EQ(sepKeyValue.getKey(), keyContent);
           ASSERT_EQ(sepKeyValue.getValueAsString(), valueContent);
           ASSERT_EQ(std::string(content.data(), content.readableBytes()), 
valueContent);
       }
   }
   ```
   
   



##########
pulsar-client-cpp/lib/Schema.cc:
##########
@@ -22,13 +22,54 @@
 #include <iostream>
 #include <map>
 #include <memory>
+#include <boost/property_tree/json_parser.hpp>
+#include <boost/property_tree/ptree.hpp>
+#include "SharedBuffer.h"
+using boost::property_tree::ptree;
+using boost::property_tree::read_json;
+using boost::property_tree::write_json;
 
 PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType 
schemaType) {
     return s << strSchemaType(schemaType);
 }
 
+PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, 
pulsar::KeyValueEncodingType encodingType) {
+    return s << strEncodingType(encodingType);
+}
+
 namespace pulsar {
 
+static const std::string KEY_SCHEMA_NAME = "key.schema.name";
+static const std::string KEY_SCHEMA_TYPE = "key.schema.type";
+static const std::string KEY_SCHEMA_PROPS = "key.schema.properties";
+static const std::string VALUE_SCHEMA_NAME = "value.schema.name";
+static const std::string VALUE_SCHEMA_TYPE = "value.schema.type";
+static const std::string VALUE_SCHEMA_PROPS = "value.schema.properties";
+static const std::string KV_ENCODING_TYPE = "kv.encoding.type";
+
+PULSAR_PUBLIC const char *strEncodingType(KeyValueEncodingType encodingType) {
+    switch (encodingType) {
+        case KeyValueEncodingType::INLINE:
+            return "INLINE";
+        case KeyValueEncodingType::SEPARATED:
+            return "SEPARATED";
+    };
+    // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
+    // Schema and miss them in the switch above we would like to get notified. 
Adding
+    // return here to make the compiler happy.
+    return "UnknownSchemaType";
+}
+
+PULSAR_PUBLIC const KeyValueEncodingType enumEncodingType(std::string 
encodingTypeStr) {
+    if (encodingTypeStr.compare("INLINE") == 0) {
+        return KeyValueEncodingType::INLINE;
+    } else if (encodingTypeStr.compare("SEPARATED") == 0) {

Review Comment:
   Not a suggested change. But you can use `==` in C++ to verify the contents 
of two strings are equal. (Not like Java requires `equals`)



##########
pulsar-client-cpp/tests/BasicEndToEndTest.cc:
##########
@@ -4104,3 +4104,89 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+TEST(BasicEndToEndTest, testKeyValueSchemaWithInline) {
+    const std::string topicName = "testKeyValueSchemaInline" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub-key-value-schema-inline";
+
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::INLINE);
+    LOG_INFO("KeyValue schema content: " << keyValueSchema.getSchema());
+
+    // Setup client, producer and consumer.
+    Client client(lookupUrl);
+
+    Producer producer;
+    ProducerConfiguration configProducer;
+    configProducer.setSchema(keyValueSchema);
+    configProducer.setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, configProducer, 
producer));
+
+    Consumer consumer;
+    ConsumerConfiguration configConsumer;
+    configConsumer.setSchema(keyValueSchema);
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, configConsumer, 
consumer));
+
+    // Sending and receiving messages.
+    std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
+    KeyValue keyValue((std::string(jsonData)), std::string(jsonData));
+    Message msg = MessageBuilder().setContent(keyValue).setProperty("x", 
"1").build();
+    ASSERT_EQ(ResultOk, producer.send(msg));
+
+    Message receiveMsg;
+    consumer.receive(receiveMsg);
+    KeyValue keyValueData = receiveMsg.getKeyValueData();
+    ASSERT_EQ(receiveMsg.getPartitionKey(), "");
+    ASSERT_EQ(keyValueData.getKey(), jsonData);
+    ASSERT_EQ(keyValueData.getValueAsString(), jsonData);
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testKeyValueSchemaWithSeparated) {
+    const std::string topicName = "testKeyValueSchemaSeparated" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub-key-value-schema-separated";
+
+    std::string jsonSchema =
+        
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, 
KeyValueEncodingType::SEPARATED);

Review Comment:
   The code are nearly the same with `testKeyValueSchemaWithInline`, you can 
use `TEST_P` to simplify the code. See how `MessageChunkingTest` deals with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to