BewareMyPower opened a new issue, #15036:
URL: https://github.com/apache/pulsar/issues/15036

   **Describe the bug**
   When a message was sent to a topic with protobuf native schema by C++ 
client, the message won't have a schema version.
   
   **To Reproduce**
   First, run the C++ unit test to send a message to a topic named 
`ProtobufSchemaTest-testEndToEnd`.
   
   ```bash
   ./tests/main --gtest_filter='ProtobufNativeSchemaTest.testEndToEnd'  
   ```
   
   We can query the topic's schema is already `PROTOBUF_NATIVE` now.
   
   ```bash
   $ curl -L 
http://localhost:8080/admin/v2/schemas/public/default/ProtobufSchemaTest-testEndToEnd/schema
   
{"version":0,"type":"PROTOBUF_NATIVE","timestamp":0,"data":"{\"fileDescriptorSet\":\"CtMDCgpUZXN0LnByb3RvEgVwcm90bxoSRXh0ZXJuYWxUZXN0LnByb3RvImUKClN1Yk1lc3NhZ2USCwoDZm9vGAEgASgJEgsKA2JhchgCIAEoARo9Cg1OZXN0ZWRNZXNzYWdlEgsKA3VybBgBIAEoCRINCgV0aXRsZRgCIAEoCRIQCghzbmlwcGV0cxgDIAMoCSLlAQoLVGVzdE1lc3NhZ2USEwoLc3RyaW5nRmllbGQYASABKAkSEwoLZG91YmxlRmllbGQYAiABKAESEAoIaW50RmllbGQYBiABKAUSIQoIdGVzdEVudW0YBCABKA4yDy5wcm90by5UZXN0RW51bRImCgtuZXN0ZWRGaWVsZBgFIAEoCzIRLnByb3RvLlN1Yk1lc3NhZ2USFQoNcmVwZWF0ZWRGaWVsZBgKIAMoCRI4Cg9leHRlcm5hbE1lc3NhZ2UYCyABKAsyHy5wcm90by5leHRlcm5hbC5FeHRlcm5hbE1lc3NhZ2UqJAoIVGVzdEVudW0SCgoGU0hBUkVEEAASDAoIRkFJTE9WRVIQAUItCiVvcmcuYXBhY2hlLnB1bHNhci5jbGllbnQuc2NoZW1hLnByb3RvQgRUZXN0YgZwcm90bzMKoAEKEkV4dGVybmFsVGVzdC5wcm90bxIOcHJvdG8uZXh0ZXJuYWwiOwoPRXh0ZXJuYWxNZXNzYWdlEhMKC3N0cmluZ0ZpZWxkGAEgASgJEhMKC2RvdWJsZUZpZWxkGAIgASgBQjUKJW9yZy5hcGFjaGUucHVsc2FyLmNsaWVudC5zY2hlbWEucHJvdG9CDEV4dGVybmFsVGVzdGIGcHJvdG8z\",\"rootMessageTypeName\":\"proto.TestMessage\",\"rootFileDescrip
 torName\":\"Test.proto\"}","properties":{}}
   ```
   
   Then, run following Java application code, which also sends a message (with 
the same 
[Test.proto](https://github.com/apache/pulsar/blob/master/pulsar-client/src/test/proto/Test.proto)),
 and consume these two messages.
   
   ```java
           final String topic = "ProtobufSchemaTest-testEndToEnd";
           try (PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
               Producer<Test.TestMessage> producer = 
client.newProducer(ProtobufNativeSchema.of(Test.TestMessage.class))
                       .topic(topic)
                       .create();
               
producer.send(Test.TestMessage.newBuilder().setTestEnum(Test.TestEnum.SHARED).build());
   
               Consumer<Test.TestMessage> consumer = 
client.newConsumer(ProtobufNativeSchema.of(Test.TestMessage.class))
                       .topic(topic)
                       
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                       .subscriptionName("my-sub")
                       .subscribe();
               for (int i = 0; i < 2; i++) {
                   final Message<Test.TestMessage> msg = consumer.receive(2, 
TimeUnit.SECONDS);
                   if (msg == null) break;
                   // NOTE: getReaderSchema() returns null in this case
                   System.out.println(i + ", enum value: " + 
msg.getValue().getTestEnum()
                           + ", schema version: "
                           + (msg.getSchemaVersion() == null ? "(null)" : 
Schema.INT64.decode(msg.getSchemaVersion()))
                   );
               }
   
               Consumer<GenericRecord> autoConsumer = 
client.newConsumer(Schema.AUTO_CONSUME())
                       .topic(topic)
                       
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                       .subscriptionName("my-sub-auto-consume")
                       .subscribe();
               for (int i = 0; i < 2; i++) {
                   final Message<GenericRecord> msg = autoConsumer.receive(2, 
TimeUnit.SECONDS);
                   if (msg == null) break;
                   System.out.println(i + " reader schema: "
                           + 
msg.getReaderSchema().map(Object::toString).orElse("(null)"));
               }
           }
   ```
   
   The output is
   
   ```
   0, enum value: FAILOVER, schema version: (null)
   1, enum value: SHARED, schema version: 0
   0 reader schema: org.apache.pulsar.client.impl.schema.BytesSchema@72cc7e6f
   1 reader schema: 
VersionedSchema(type=PROTOBUF_NATIVE,schemaVersion=\x00\x00\x00\x00\x00\x00\x00\x00,name=public/default/ProtobufSchemaTest-testEndToEnd)
   ```
   
   We can see message 0 was sent by C++ producer, whose enum value is 
`FAILOVER`. However, message 0 doesn't contain a schema version and its reader 
schema is `BytesSchema`, not `VersionedSchema`.
   
   BTW, if we added following code to `TEST(ProtobufNativeSchemaTest, 
testEndToEnd)`
   
   ```c++
       if (msg.hasSchemaVersion()) {
           std::cout << "message's schema version: " << msg.getSchemaVersion() 
<< std::endl;
       } else {
           std::cout << "no schema version" << std::endl;
       }
   ```
   
   we can see the output is `no schema version`.
   
   **Expected behavior**
   We should add the schema version to messages with protobuf native schema. 
i.e. the following check should pass:
   
   ```c++
   ASSERT_TRUE(msg.hasSchemaVersion());
   ```
   
   In addition, we might need to add `Message::getReaderSchema()` API in C++ 
client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to