BewareMyPower opened a new issue, #108:
URL: https://github.com/apache/pulsar-client-python/issues/108

   ## How to reproduce
   
   Run a Pulsar standalone 2.11.
   
   First, create a Python consumer whose schema is a class with a string field 
`name` and an integer field `age`:
   
   ```python
   import pulsar
   from pulsar.schema import *
   import fastavro
   
   class User(Record):
       name = String()
       age = Integer()
   
   print(fastavro.parse_schema(User.schema()))
   
   client = pulsar.Client('pulsar://localhost:6650')
   consumer = client.subscribe('my-topic',
                               subscription_name="sub",
                               schema=AvroSchema(User))
   
   while True:
       try:
           msg = consumer.receive()
           version = int.from_bytes(msg.schema_version().encode('ascii'), 
byteorder='big')
           print(f"Received {len(msg.data())} bytes id='{msg.message_id()}' 
version='{version}'")
           value = msg.value()
           print(value)
           print(f"name: {value.name}, age: {value.age}")
           consumer.acknowledge(msg)
       except pulsar.Interrupted:
           print("Stop receiving messages")
           break
   
   client.close()
   ```
   
   Then, set the schema compatibility to `FORWARD`:
   
   ```bash
   curl -L 
http://localhost:8080/admin/v2/namespaces/public/default/schemaCompatibilityStrategy
 \
      -X PUT -H 'Content-Type: application/json' -d '"FORWARD"'
   ```
   
   > **NOTE**: Here we have to set the schema compatibility, otherwise the Java 
producer cannot be created. It's another bug. I will talk about it later.
   
   Then, run the Java producer to send a message (`User{name="xyz", age=10}`):
   
   ```java
       @AllArgsConstructor
       @Getter
       public class User {
           private final String name;
           private final int age;
       }
   ```
   
   ```java
           @Cleanup PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://172.22.48.50:6650").build();
           Producer<User> producer = client.newProducer(Schema.AVRO(User.class))
                   .topic("my-topic").create();
           producer.send(new User("xyz", 10));
   ```
   
   Then, the Python consumer application will crash with the following logs:
   
   ```
   Received 6 bytes id='(1,0,-1,-1)' version='1'
   Traceback (most recent call last):
     File "consumer.py", line 42, in <module>
       value = msg.value()
     File "/home/xyz/pulsar-client-python/pulsar/__init__.py", line 130, in 
value
       return self._schema.decode(self._message.data())
     File "/home/xyz/pulsar-client-python/pulsar/schema/schema_avro.py", line 
80, in decode
       d = fastavro.schemaless_reader(buffer, self._schema)
     File "fastavro/_read.pyx", line 1107, in fastavro._read.schemaless_reader
     File "fastavro/_read.pyx", line 1120, in fastavro._read.schemaless_reader
     File "fastavro/_read.pyx", line 749, in fastavro._read._read_data
     File "fastavro/_read.pyx", line 620, in fastavro._read.read_record
     File "fastavro/_read.pyx", line 740, in fastavro._read._read_data
     File "fastavro/_read.pyx", line 521, in fastavro._read.read_union
   IndexError: list index out of range
   ```
   
   ## Analysis
   
   There are two bugs. First, the schema definition generated by the Python 
client is different from the Java client. Copy these two classes here:
   
   ```python
   class User(Record):
       name = String()
       age = Integer()
   ```
   
   ```json
       @AllArgsConstructor
       @Getter
       static class User {
           private final String name;
           private final int age;
       }
   ```
   
   Check the schema definitions (`pulsar-admin schemas get my-topic -v 
<version>`) and we can find there are two versions of the schema:
   
   ```json
   {
     "name": "my-topic",
     "schema": {
       "type": "record",
       "name": "User",
       "fields": [
         {
           "name": "name",
           "type": [
             "null",
             "string"
           ]
         },
         {
           "name": "age",
           "type": [
             "null",
             "int"
           ]
         }
       ]
     },
     "type": "AVRO",
     "timestamp": 1680074000129,
     "properties": {}
   }
   ```
   
   ```json
   {
     "name": "my-topic",
     "schema": {
       "type": "record",
       "name": "User",
       "namespace": "org.apache.pulsar.client.api.ConsumerIdTest",
       "fields": [
         {
           "name": "age",
           "type": "int"
         },
         {
           "name": "name",
           "type": [
             "null",
             "string"
           ]
         }
       ]
     },
     "type": "AVRO",
     "timestamp": 1680074158624,
     "properties": {
       "__alwaysAllowNull": "true",
       "__jsr310ConversionEnabled": "false"
     }
   }
   ```
   
   We can see:
   - Python client treats all fields as `nullable` fields, (or say it 
correctly, they are [Avro 
unions](https://avro.apache.org/docs/1.10.2/spec.html#Unions)), even including 
the `int` field
   - Java client only treats the string as `nullable` fields since Primitive 
types cannot be `null`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to