This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d81bb184abaa7a4b97375327c25c08df43e13b09
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Apr 8 12:35:27 2022 +0800

    [C++] Fix single message metadata not set correctly (#15072)
    
    ### Motivation
    
    Recently I found the messages sent by C++ producer don't have the schema
    version, which causes Java consumer cannot consume them with
    `AUTO_CONSUME` schema. After rechecking the code, I found the C++ client
    doesn't set single message metadata correctly, i.e. when batching is
    enabled, some messages' metadata could be wrong.
    
    - In `initBatchMessageMetadata`, the schema version is not set.
    - In `serializeSingleMessageInBatchWithPayload`, the ordering key and
      the sequence id are not set.
    
    In addition, when a C++ consumer consumes batched messages from a Java
    producer, some metadata might be wrong. Because even for batched
    messages, Java producer also sets the partition key and the ordering
    key. It's redundant because only keys in `SingleMessageMetadata` should
    be set. To avoid 2nd and later single messages in the batch reuse the
    keys in `MessageMetadata`, Java client clears these fields if the
    `SingleMessageMetadata` doesn't contain them when a `MessageImpl` is
    constructed.
    
    ### Modifications
    
    - Set the fields that were not set before when creating a batch. Some
      fields like `null_value` and `null_partition_key` are not set because
      they are not supported by C++ client at this moment.
    - Use a more efficient way to copy the repeated fields of ProtoBuf.
    - Clear some fields when they are not contained by the
      `SingleMessageMetadata` object when creating a `MessageImpl` so that
      the bahavior could be consisitent with Java client.
    
    ### Verifying this change
    
    Following tests are added:
    - `BatchMessageTest.testSingleMessageMetadata`: test 3 single messages
      in batch are consumed successfully, i.e. the correct metadata is received.
    - `SchemaTest.testHasSchemaVersion`: test when schema is configured, all
      messages should has the schema version.
    - The validation for schema version is also added to
      `ProtobufNativeSchemaTest.testEndToEnd`.
    
    (cherry picked from commit 6f41fdebf642b08e2c3f45f3574963ec0936868c)
    
    Resolve the conflicts by deleting `ProtobufNativeSchemaTest.cc` because
    it only exists from branch-2.9.
---
 pulsar-client-cpp/lib/Commands.cc           | 31 ++++++++----
 pulsar-client-cpp/lib/Message.cc            | 26 ++++++++++
 pulsar-client-cpp/tests/BatchMessageTest.cc | 78 +++++++++++++++++++++++++++++
 pulsar-client-cpp/tests/SchemaTest.cc       | 34 +++++++++++++
 4 files changed, 158 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 95e0cc792b6..48beb98bfbd 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -677,26 +677,35 @@ void Commands::initBatchMessageMetadata(const Message& 
msg, pulsar::proto::Messa
             batchMetadata.add_replicate_to(metadata.replicate_to(i));
         }
     }
-    // TODO: set other optional fields
+    if (metadata.has_schema_version()) {
+        batchMetadata.set_schema_version(metadata.schema_version());
+    }
 }
 
 uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& 
msg, SharedBuffer& batchPayLoad,
                                                             unsigned long 
maxMessageSizeInBytes) {
+    const auto& msgMetadata = msg.impl_->metadata;
     SingleMessageMetadata metadata;
-    if (msg.impl_->hasPartitionKey()) {
-        metadata.set_partition_key(msg.impl_->getPartitionKey());
+    if (msgMetadata.has_partition_key()) {
+        metadata.set_partition_key(msgMetadata.partition_key());
+    }
+    if (msgMetadata.has_ordering_key()) {
+        metadata.set_ordering_key(msgMetadata.ordering_key());
     }
 
-    for (MessageBuilder::StringMap::const_iterator it = 
msg.impl_->properties().begin();
-         it != msg.impl_->properties().end(); it++) {
-        proto::KeyValue* keyValue = proto::KeyValue().New();
-        keyValue->set_key(it->first);
-        keyValue->set_value(it->second);
+    metadata.mutable_properties()->Reserve(msgMetadata.properties_size());
+    for (int i = 0; i < msgMetadata.properties_size(); i++) {
+        auto keyValue = proto::KeyValue().New();
+        *keyValue = msgMetadata.properties(i);
         metadata.mutable_properties()->AddAllocated(keyValue);
     }
 
-    if (msg.impl_->getEventTimestamp() != 0) {
-        metadata.set_event_time(msg.impl_->getEventTimestamp());
+    if (msgMetadata.has_event_time()) {
+        metadata.set_event_time(msgMetadata.event_time());
+    }
+
+    if (msgMetadata.has_sequence_id()) {
+        metadata.set_sequence_id(msgMetadata.sequence_id());
     }
 
     // Format of batch message
@@ -726,7 +735,7 @@ uint64_t 
Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
     batchPayLoad.bytesWritten(msgMetadataSize);
     batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
 
-    return msg.impl_->metadata.sequence_id();
+    return msgMetadata.sequence_id();
 }
 
 Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, 
int32_t batchIndex) {
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 76e408ffef4..b928945cfae 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -79,12 +79,38 @@ Message::Message(const MessageId& messageID, 
proto::MessageMetadata& metadata, S
     
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
     impl_->topicName_ = &topicName;
 
+    impl_->metadata.clear_properties();
+    if (singleMetadata.properties_size() > 0) {
+        
impl_->metadata.mutable_properties()->Reserve(singleMetadata.properties_size());
+        for (int i = 0; i < singleMetadata.properties_size(); i++) {
+            auto keyValue = proto::KeyValue().New();
+            *keyValue = singleMetadata.properties(i);
+            impl_->metadata.mutable_properties()->AddAllocated(keyValue);
+        }
+    }
+
     if (singleMetadata.has_partition_key()) {
         impl_->metadata.set_partition_key(singleMetadata.partition_key());
+    } else {
+        impl_->metadata.clear_partition_key();
+    }
+
+    if (singleMetadata.has_ordering_key()) {
+        impl_->metadata.set_ordering_key(singleMetadata.ordering_key());
+    } else {
+        impl_->metadata.clear_ordering_key();
     }
 
     if (singleMetadata.has_event_time()) {
         impl_->metadata.set_event_time(singleMetadata.event_time());
+    } else {
+        impl_->metadata.clear_event_time();
+    }
+
+    if (singleMetadata.has_sequence_id()) {
+        impl_->metadata.set_sequence_id(singleMetadata.sequence_id());
+    } else {
+        impl_->metadata.clear_sequence_id();
     }
 }
 
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc 
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index ebe6bf159ba..62fd5fff25c 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -1071,3 +1071,81 @@ TEST(BatchMessageTest, testProducerQueueWithBatches) {
 
     ASSERT_EQ(rejectedMessges, 10);
 }
+
+TEST(BatchMessageTest, testSingleMessageMetadata) {
+    const auto topic = "BatchMessageTest-SingleMessageMetadata-" + 
std::to_string(time(nullptr));
+    constexpr int numMessages = 3;
+
+    Client client(lookupUrl);
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(
+                            topic, 
ProducerConfiguration().setBatchingMaxMessages(numMessages), producer));
+
+    producer.sendAsync(MessageBuilder()
+                           .setContent("msg-0")
+                           .setPartitionKey("key-0")
+                           .setOrderingKey("ordering-key-0")
+                           .setEventTimestamp(10UL)
+                           .setProperty("k0", "v0")
+                           .setProperty("k1", "v1")
+                           .build(),
+                       nullptr);
+    producer.sendAsync(MessageBuilder()
+                           .setContent("msg-1")
+                           .setOrderingKey("ordering-key-1")
+                           .setEventTimestamp(11UL)
+                           .setProperty("k2", "v2")
+                           .build(),
+                       nullptr);
+    producer.sendAsync(MessageBuilder().setContent("msg-2").build(), nullptr);
+    ASSERT_EQ(ResultOk, producer.flush());
+
+    Message msgs[numMessages];
+    for (int i = 0; i < numMessages; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        msgs[i] = msg;
+        LOG_INFO("message " << i << ": " << msg.getDataAsString()
+                            << ", key: " << (msg.hasPartitionKey() ? 
msg.getPartitionKey() : "(null)")
+                            << ", ordering key: " << (msg.hasOrderingKey() ? 
msg.getOrderingKey() : "(null)")
+                            << ", event time: " << (msg.getEventTimestamp())
+                            << ", properties count: " << 
msg.getProperties().size()
+                            << ", has schema version: " << 
msg.hasSchemaVersion());
+    }
+
+    ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
+    ASSERT_TRUE(msgs[0].hasPartitionKey());
+    ASSERT_EQ(msgs[0].getPartitionKey(), "key-0");
+    ASSERT_TRUE(msgs[0].hasOrderingKey());
+    ASSERT_EQ(msgs[0].getOrderingKey(), "ordering-key-0");
+    ASSERT_EQ(msgs[0].getEventTimestamp(), 10UL);
+    ASSERT_EQ(msgs[0].getProperties().size(), 2);
+    ASSERT_TRUE(msgs[0].hasProperty("k0"));
+    ASSERT_EQ(msgs[0].getProperty("k0"), "v0");
+    ASSERT_TRUE(msgs[0].hasProperty("k1"));
+    ASSERT_EQ(msgs[0].getProperty("k1"), "v1");
+    ASSERT_FALSE(msgs[0].hasSchemaVersion());
+
+    ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
+    ASSERT_FALSE(msgs[1].hasPartitionKey());
+    ASSERT_TRUE(msgs[1].hasOrderingKey());
+    ASSERT_EQ(msgs[1].getOrderingKey(), "ordering-key-1");
+    ASSERT_EQ(msgs[1].getEventTimestamp(), 11UL);
+    ASSERT_EQ(msgs[1].getProperties().size(), 1);
+    ASSERT_TRUE(msgs[1].hasProperty("k2"));
+    ASSERT_EQ(msgs[1].getProperty("k2"), "v2");
+    ASSERT_FALSE(msgs[1].hasSchemaVersion());
+
+    ASSERT_EQ(msgs[2].getDataAsString(), "msg-2");
+    ASSERT_FALSE(msgs[2].hasPartitionKey());
+    ASSERT_FALSE(msgs[2].hasOrderingKey());
+    ASSERT_EQ(msgs[2].getEventTimestamp(), 0UL);
+    ASSERT_EQ(msgs[2].getProperties().size(), 0);
+    ASSERT_FALSE(msgs[2].hasSchemaVersion());
+
+    client.close();
+}
diff --git a/pulsar-client-cpp/tests/SchemaTest.cc 
b/pulsar-client-cpp/tests/SchemaTest.cc
index e11069fb06b..45491b9cf81 100644
--- a/pulsar-client-cpp/tests/SchemaTest.cc
+++ b/pulsar-client-cpp/tests/SchemaTest.cc
@@ -70,3 +70,37 @@ TEST(SchemaTest, testSchema) {
     res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
     ASSERT_EQ(ResultIncompatibleSchema, res);
 }
+
+TEST(SchemaTest, testHasSchemaVersion) {
+    Client client(lookupUrl);
+    std::string topic = "SchemaTest-HasSchemaVersion";
+    SchemaInfo stringSchema(SchemaType::STRING, "String", "");
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic + "1", "sub", 
ConsumerConfiguration().setSchema(stringSchema),
+                                         consumer));
+    Producer batchedProducer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic + "1", 
ProducerConfiguration().setSchema(stringSchema),
+                                              batchedProducer));
+    Producer nonBatchedProducer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic + "1", 
ProducerConfiguration().setSchema(stringSchema),
+                                              nonBatchedProducer));
+
+    ASSERT_EQ(ResultOk, 
batchedProducer.send(MessageBuilder().setContent("msg-0").build()));
+    ASSERT_EQ(ResultOk, 
nonBatchedProducer.send(MessageBuilder().setContent("msg-1").build()));
+
+    Message msgs[2];
+    ASSERT_EQ(ResultOk, consumer.receive(msgs[0], 3000));
+    ASSERT_EQ(ResultOk, consumer.receive(msgs[1], 3000));
+
+    std::string schemaVersion(8, '\0');
+    ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
+    ASSERT_TRUE(msgs[0].hasSchemaVersion());
+    ASSERT_EQ(msgs[0].getSchemaVersion(), schemaVersion);
+
+    ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
+    ASSERT_TRUE(msgs[1].hasSchemaVersion());
+    ASSERT_EQ(msgs[1].getSchemaVersion(), schemaVersion);
+
+    client.close();
+}

Reply via email to