chlyzzo opened a new issue #12596:
URL: https://github.com/apache/pulsar/issues/12596


   # What issue do you find in Pulsar docs?
   https://pulsar.apache.org/docs/en/client-libraries-python/
   from the doc, i can not read topic mess by use python,
   
   spark:3.1.2
   
   scala:2.12
   
   python:3.7
   
   pulsar:2.8.0
   
   spark-plusar:
              <groupId>org.apache.pulsar</groupId>
               <artifactId>pulsar-spark</artifactId>
               <version>2.8.0</version>
   
   python-pulsar:
   python3 -m pip install pulsar-client=='2.8.1'
   python3 -m pip install pulsar-client[avro]
   
   ---------------
   first, in spark scala, i create a topic schema:
   
   object SchemaUtils {
     class Users () {
       var name: String = null
       var age: Int = -1
       
       def this(name: String, age: Int) {
         this()
         this.name = name
         this.age = age
       }
       override def toString: String = s"{name: $name, age: $age}"
       def getName(): String = {
           return name;
       }
       def getAge() : Int = {
           return age;
       }
       
     }
   }
   
   ------------
   second, in spark scala, i send a message into topic, use the Users schema:
   import spatio.topics.SchemaUtils.Users
   import org.apache.pulsar.client.api.{PulsarClient, Schema}
   
   object clientProducer {
       def main(args: Array[String]): Unit = {
       val user = new Users("werewr", 35)
       println(user)
       val service_url_tn = "****"
       val topic = "persistent://st/forever/scala-Users-topic"
       val client = PulsarClient.builder()
                       .serviceUrl(service_url_tn)
                       .build()
        val producer =  
client.newProducer(Schema.AVRO(classOf[Users])).topic(topic).create()
        producer.send(user)
       println("send success" + user.toString())
     }
   }
   ------------------
   third, the messge has send success, 
   # ./bin/pulsar-admin schemas get persistent://st/forever/scala-Users-topic
   {
     "version": 0,
     "schemaInfo": {
       "name": "scala-Users-topic",
       "schema": {
         "type": "record",
         "name": "Users",
         "namespace": "spatio.topics.SchemaUtils$",
         "fields": [
           {
             "name": "name",
             "type": [
               "null",
               "string"
             ]
           },
           {
             "name": "age",
             "type": "int"
           }
         ]
       },
       "type": "AVRO",
       "properties": {
         "__alwaysAllowNull": "true"
       }
     }
   }
   -----------------
   four, in spark scala, i read the topic, and get the message,
   
   object clientConsumer {
       implicit val formats = Serialization.formats(NoTypeHints)
       def main(args: Array[String]): Unit = {
       val service_url_tn = "****"
       val client = PulsarClient.builder()
                       .serviceUrl(service_url_tn)
                       .build()
                       
       val topic = "persistent://st/forever/scala-Users-topic"
       val consumer =  client.newConsumer(Schema.AVRO(classOf[Users]))
             .topic(topic)
             .subscriptionName("read-my-topic")
             .subscribeAsync().join()
             
       var cnt = 0
       while (true) {
           val user = consumer.receive()
           println(write(user.getValue)) // 将类转为json输出
           cnt +=1
           println(cnt)
       }
     }
   }
   
   
   and get :
   log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for 
more info.
   {"name":"werewr","age":35}
   1
   ----------------
   five, so in python, i want to read the topic, but get error,
   
   class Users(Record):
       name = String()
       age = Integer()
   
   
   and read topic:
   
   from pulsar.schema import AvroSchema
   import pulsar
   from pulsar import MessageId
   from pulsar import InitialPosition
   import schemaProduce
   import pulsar_topics
   
   client = pulsar.Client('****')
   topic = "persistent://st/forever/scala-Users-topic"
   msg_id = MessageId.earliest
   topic_schema = AvroSchema(pulsar_topics.Users)
   print(topic_schema, type(topic_schema))
   consumer = client.subscribe(topic = topic, subscription_name='python-read', 
schema = topic_schema, initial_position=InitialPosition.Earliest)
   while True:
       msg = consumer.receive()
       print(msg.data())
       print(msg.value())
        
   consumer.close()
   client.close()
   ----------------
   
   but get the erro:
   
   b'\x02\x0cwerewrF'
   Traceback (most recent call last):
     File "***/schemaConsume.py", line 38, in <module>
       print(msg.value())
     File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/__init__.py",
 line 180, in value
       return self._schema.decode(self._message.data())
     File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/schema/schema_avro.py",
 line 70, in decode
       d = fastavro.schemaless_reader(buffer, self._schema)
     File "fastavro/_read.pyx", line 976, in fastavro._read.schemaless_reader
     File "fastavro/_read.pyx", line 988, in fastavro._read.schemaless_reader
     File "fastavro/_read.pyx", line 659, in fastavro._read._read_data
     File "fastavro/_read.pyx", line 516, in fastavro._read.read_record
     File "fastavro/_read.pyx", line 651, in fastavro._read._read_data
     File "fastavro/_read.pyx", line 436, in fastavro._read.read_union
   IndexError: list index out of range
   
   -----
   so, how read the topic message?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to