This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d81bb184abaa7a4b97375327c25c08df43e13b09 Author: Yunze Xu <[email protected]> AuthorDate: Fri Apr 8 12:35:27 2022 +0800 [C++] Fix single message metadata not set correctly (#15072) ### Motivation Recently I found the messages sent by C++ producer don't have the schema version, which causes Java consumer cannot consume them with `AUTO_CONSUME` schema. After rechecking the code, I found the C++ client doesn't set single message metadata correctly, i.e. when batching is enabled, some messages' metadata could be wrong. - In `initBatchMessageMetadata`, the schema version is not set. - In `serializeSingleMessageInBatchWithPayload`, the ordering key and the sequence id are not set. In addition, when a C++ consumer consumes batched messages from a Java producer, some metadata might be wrong. Because even for batched messages, Java producer also sets the partition key and the ordering key. It's redundant because only keys in `SingleMessageMetadata` should be set. To avoid 2nd and later single messages in the batch reuse the keys in `MessageMetadata`, Java client clears these fields if the `SingleMessageMetadata` doesn't contain them when a `MessageImpl` is constructed. ### Modifications - Set the fields that were not set before when creating a batch. Some fields like `null_value` and `null_partition_key` are not set because they are not supported by C++ client at this moment. - Use a more efficient way to copy the repeated fields of ProtoBuf. - Clear some fields when they are not contained by the `SingleMessageMetadata` object when creating a `MessageImpl` so that the bahavior could be consisitent with Java client. ### Verifying this change Following tests are added: - `BatchMessageTest.testSingleMessageMetadata`: test 3 single messages in batch are consumed successfully, i.e. the correct metadata is received. - `SchemaTest.testHasSchemaVersion`: test when schema is configured, all messages should has the schema version. - The validation for schema version is also added to `ProtobufNativeSchemaTest.testEndToEnd`. (cherry picked from commit 6f41fdebf642b08e2c3f45f3574963ec0936868c) Resolve the conflicts by deleting `ProtobufNativeSchemaTest.cc` because it only exists from branch-2.9. --- pulsar-client-cpp/lib/Commands.cc | 31 ++++++++---- pulsar-client-cpp/lib/Message.cc | 26 ++++++++++ pulsar-client-cpp/tests/BatchMessageTest.cc | 78 +++++++++++++++++++++++++++++ pulsar-client-cpp/tests/SchemaTest.cc | 34 +++++++++++++ 4 files changed, 158 insertions(+), 11 deletions(-) diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 95e0cc792b6..48beb98bfbd 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -677,26 +677,35 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa batchMetadata.add_replicate_to(metadata.replicate_to(i)); } } - // TODO: set other optional fields + if (metadata.has_schema_version()) { + batchMetadata.set_schema_version(metadata.schema_version()); + } } uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad, unsigned long maxMessageSizeInBytes) { + const auto& msgMetadata = msg.impl_->metadata; SingleMessageMetadata metadata; - if (msg.impl_->hasPartitionKey()) { - metadata.set_partition_key(msg.impl_->getPartitionKey()); + if (msgMetadata.has_partition_key()) { + metadata.set_partition_key(msgMetadata.partition_key()); + } + if (msgMetadata.has_ordering_key()) { + metadata.set_ordering_key(msgMetadata.ordering_key()); } - for (MessageBuilder::StringMap::const_iterator it = msg.impl_->properties().begin(); - it != msg.impl_->properties().end(); it++) { - proto::KeyValue* keyValue = proto::KeyValue().New(); - keyValue->set_key(it->first); - keyValue->set_value(it->second); + metadata.mutable_properties()->Reserve(msgMetadata.properties_size()); + for (int i = 0; i < msgMetadata.properties_size(); i++) { + auto keyValue = proto::KeyValue().New(); + *keyValue = msgMetadata.properties(i); metadata.mutable_properties()->AddAllocated(keyValue); } - if (msg.impl_->getEventTimestamp() != 0) { - metadata.set_event_time(msg.impl_->getEventTimestamp()); + if (msgMetadata.has_event_time()) { + metadata.set_event_time(msgMetadata.event_time()); + } + + if (msgMetadata.has_sequence_id()) { + metadata.set_sequence_id(msgMetadata.sequence_id()); } // Format of batch message @@ -726,7 +735,7 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, batchPayLoad.bytesWritten(msgMetadataSize); batchPayLoad.write(msg.impl_->payload.data(), payloadSize); - return msg.impl_->metadata.sequence_id(); + return msgMetadata.sequence_id(); } Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) { diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc index 76e408ffef4..b928945cfae 100644 --- a/pulsar-client-cpp/lib/Message.cc +++ b/pulsar-client-cpp/lib/Message.cc @@ -79,12 +79,38 @@ Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, S impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties()); impl_->topicName_ = &topicName; + impl_->metadata.clear_properties(); + if (singleMetadata.properties_size() > 0) { + impl_->metadata.mutable_properties()->Reserve(singleMetadata.properties_size()); + for (int i = 0; i < singleMetadata.properties_size(); i++) { + auto keyValue = proto::KeyValue().New(); + *keyValue = singleMetadata.properties(i); + impl_->metadata.mutable_properties()->AddAllocated(keyValue); + } + } + if (singleMetadata.has_partition_key()) { impl_->metadata.set_partition_key(singleMetadata.partition_key()); + } else { + impl_->metadata.clear_partition_key(); + } + + if (singleMetadata.has_ordering_key()) { + impl_->metadata.set_ordering_key(singleMetadata.ordering_key()); + } else { + impl_->metadata.clear_ordering_key(); } if (singleMetadata.has_event_time()) { impl_->metadata.set_event_time(singleMetadata.event_time()); + } else { + impl_->metadata.clear_event_time(); + } + + if (singleMetadata.has_sequence_id()) { + impl_->metadata.set_sequence_id(singleMetadata.sequence_id()); + } else { + impl_->metadata.clear_sequence_id(); } } diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc index ebe6bf159ba..62fd5fff25c 100644 --- a/pulsar-client-cpp/tests/BatchMessageTest.cc +++ b/pulsar-client-cpp/tests/BatchMessageTest.cc @@ -1071,3 +1071,81 @@ TEST(BatchMessageTest, testProducerQueueWithBatches) { ASSERT_EQ(rejectedMessges, 10); } + +TEST(BatchMessageTest, testSingleMessageMetadata) { + const auto topic = "BatchMessageTest-SingleMessageMetadata-" + std::to_string(time(nullptr)); + constexpr int numMessages = 3; + + Client client(lookupUrl); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer( + topic, ProducerConfiguration().setBatchingMaxMessages(numMessages), producer)); + + producer.sendAsync(MessageBuilder() + .setContent("msg-0") + .setPartitionKey("key-0") + .setOrderingKey("ordering-key-0") + .setEventTimestamp(10UL) + .setProperty("k0", "v0") + .setProperty("k1", "v1") + .build(), + nullptr); + producer.sendAsync(MessageBuilder() + .setContent("msg-1") + .setOrderingKey("ordering-key-1") + .setEventTimestamp(11UL) + .setProperty("k2", "v2") + .build(), + nullptr); + producer.sendAsync(MessageBuilder().setContent("msg-2").build(), nullptr); + ASSERT_EQ(ResultOk, producer.flush()); + + Message msgs[numMessages]; + for (int i = 0; i < numMessages; i++) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + msgs[i] = msg; + LOG_INFO("message " << i << ": " << msg.getDataAsString() + << ", key: " << (msg.hasPartitionKey() ? msg.getPartitionKey() : "(null)") + << ", ordering key: " << (msg.hasOrderingKey() ? msg.getOrderingKey() : "(null)") + << ", event time: " << (msg.getEventTimestamp()) + << ", properties count: " << msg.getProperties().size() + << ", has schema version: " << msg.hasSchemaVersion()); + } + + ASSERT_EQ(msgs[0].getDataAsString(), "msg-0"); + ASSERT_TRUE(msgs[0].hasPartitionKey()); + ASSERT_EQ(msgs[0].getPartitionKey(), "key-0"); + ASSERT_TRUE(msgs[0].hasOrderingKey()); + ASSERT_EQ(msgs[0].getOrderingKey(), "ordering-key-0"); + ASSERT_EQ(msgs[0].getEventTimestamp(), 10UL); + ASSERT_EQ(msgs[0].getProperties().size(), 2); + ASSERT_TRUE(msgs[0].hasProperty("k0")); + ASSERT_EQ(msgs[0].getProperty("k0"), "v0"); + ASSERT_TRUE(msgs[0].hasProperty("k1")); + ASSERT_EQ(msgs[0].getProperty("k1"), "v1"); + ASSERT_FALSE(msgs[0].hasSchemaVersion()); + + ASSERT_EQ(msgs[1].getDataAsString(), "msg-1"); + ASSERT_FALSE(msgs[1].hasPartitionKey()); + ASSERT_TRUE(msgs[1].hasOrderingKey()); + ASSERT_EQ(msgs[1].getOrderingKey(), "ordering-key-1"); + ASSERT_EQ(msgs[1].getEventTimestamp(), 11UL); + ASSERT_EQ(msgs[1].getProperties().size(), 1); + ASSERT_TRUE(msgs[1].hasProperty("k2")); + ASSERT_EQ(msgs[1].getProperty("k2"), "v2"); + ASSERT_FALSE(msgs[1].hasSchemaVersion()); + + ASSERT_EQ(msgs[2].getDataAsString(), "msg-2"); + ASSERT_FALSE(msgs[2].hasPartitionKey()); + ASSERT_FALSE(msgs[2].hasOrderingKey()); + ASSERT_EQ(msgs[2].getEventTimestamp(), 0UL); + ASSERT_EQ(msgs[2].getProperties().size(), 0); + ASSERT_FALSE(msgs[2].hasSchemaVersion()); + + client.close(); +} diff --git a/pulsar-client-cpp/tests/SchemaTest.cc b/pulsar-client-cpp/tests/SchemaTest.cc index e11069fb06b..45491b9cf81 100644 --- a/pulsar-client-cpp/tests/SchemaTest.cc +++ b/pulsar-client-cpp/tests/SchemaTest.cc @@ -70,3 +70,37 @@ TEST(SchemaTest, testSchema) { res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer); ASSERT_EQ(ResultIncompatibleSchema, res); } + +TEST(SchemaTest, testHasSchemaVersion) { + Client client(lookupUrl); + std::string topic = "SchemaTest-HasSchemaVersion"; + SchemaInfo stringSchema(SchemaType::STRING, "String", ""); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic + "1", "sub", ConsumerConfiguration().setSchema(stringSchema), + consumer)); + Producer batchedProducer; + ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema), + batchedProducer)); + Producer nonBatchedProducer; + ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema), + nonBatchedProducer)); + + ASSERT_EQ(ResultOk, batchedProducer.send(MessageBuilder().setContent("msg-0").build())); + ASSERT_EQ(ResultOk, nonBatchedProducer.send(MessageBuilder().setContent("msg-1").build())); + + Message msgs[2]; + ASSERT_EQ(ResultOk, consumer.receive(msgs[0], 3000)); + ASSERT_EQ(ResultOk, consumer.receive(msgs[1], 3000)); + + std::string schemaVersion(8, '\0'); + ASSERT_EQ(msgs[0].getDataAsString(), "msg-0"); + ASSERT_TRUE(msgs[0].hasSchemaVersion()); + ASSERT_EQ(msgs[0].getSchemaVersion(), schemaVersion); + + ASSERT_EQ(msgs[1].getDataAsString(), "msg-1"); + ASSERT_TRUE(msgs[1].hasSchemaVersion()); + ASSERT_EQ(msgs[1].getSchemaVersion(), schemaVersion); + + client.close(); +}
