chlyzzo opened a new issue #12596: URL: https://github.com/apache/pulsar/issues/12596
# What issue do you find in Pulsar docs? https://pulsar.apache.org/docs/en/client-libraries-python/ from the doc, i can not read topic mess by use python, spark:3.1.2 scala:2.12 python:3.7 pulsar:2.8.0 spark-plusar: <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-spark</artifactId> <version>2.8.0</version> python-pulsar: python3 -m pip install pulsar-client=='2.8.1' python3 -m pip install pulsar-client[avro] --------------- first, in spark scala, i create a topic schema: object SchemaUtils { class Users () { var name: String = null var age: Int = -1 def this(name: String, age: Int) { this() this.name = name this.age = age } override def toString: String = s"{name: $name, age: $age}" def getName(): String = { return name; } def getAge() : Int = { return age; } } } ------------ second, in spark scala, i send a message into topic, use the Users schema: import spatio.topics.SchemaUtils.Users import org.apache.pulsar.client.api.{PulsarClient, Schema} object clientProducer { def main(args: Array[String]): Unit = { val user = new Users("werewr", 35) println(user) val service_url_tn = "****" val topic = "persistent://st/forever/scala-Users-topic" val client = PulsarClient.builder() .serviceUrl(service_url_tn) .build() val producer = client.newProducer(Schema.AVRO(classOf[Users])).topic(topic).create() producer.send(user) println("send success" + user.toString()) } } ------------------ third, the messge has send success, # ./bin/pulsar-admin schemas get persistent://st/forever/scala-Users-topic { "version": 0, "schemaInfo": { "name": "scala-Users-topic", "schema": { "type": "record", "name": "Users", "namespace": "spatio.topics.SchemaUtils$", "fields": [ { "name": "name", "type": [ "null", "string" ] }, { "name": "age", "type": "int" } ] }, "type": "AVRO", "properties": { "__alwaysAllowNull": "true" } } } ----------------- four, in spark scala, i read the topic, and get the message, object clientConsumer { implicit val formats = Serialization.formats(NoTypeHints) def main(args: Array[String]): Unit = { val service_url_tn = "****" val client = PulsarClient.builder() .serviceUrl(service_url_tn) .build() val topic = "persistent://st/forever/scala-Users-topic" val consumer = client.newConsumer(Schema.AVRO(classOf[Users])) .topic(topic) .subscriptionName("read-my-topic") .subscribeAsync().join() var cnt = 0 while (true) { val user = consumer.receive() println(write(user.getValue)) // 将类转为json输出 cnt +=1 println(cnt) } } } and get : log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {"name":"werewr","age":35} 1 ---------------- five, so in python, i want to read the topic, but get error, class Users(Record): name = String() age = Integer() and read topic: from pulsar.schema import AvroSchema import pulsar from pulsar import MessageId from pulsar import InitialPosition import schemaProduce import pulsar_topics client = pulsar.Client('****') topic = "persistent://st/forever/scala-Users-topic" msg_id = MessageId.earliest topic_schema = AvroSchema(pulsar_topics.Users) print(topic_schema, type(topic_schema)) consumer = client.subscribe(topic = topic, subscription_name='python-read', schema = topic_schema, initial_position=InitialPosition.Earliest) while True: msg = consumer.receive() print(msg.data()) print(msg.value()) consumer.close() client.close() ---------------- but get the erro: b'\x02\x0cwerewrF' Traceback (most recent call last): File "***/schemaConsume.py", line 38, in <module> print(msg.value()) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/__init__.py", line 180, in value return self._schema.decode(self._message.data()) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pulsar/schema/schema_avro.py", line 70, in decode d = fastavro.schemaless_reader(buffer, self._schema) File "fastavro/_read.pyx", line 976, in fastavro._read.schemaless_reader File "fastavro/_read.pyx", line 988, in fastavro._read.schemaless_reader File "fastavro/_read.pyx", line 659, in fastavro._read._read_data File "fastavro/_read.pyx", line 516, in fastavro._read.read_record File "fastavro/_read.pyx", line 651, in fastavro._read._read_data File "fastavro/_read.pyx", line 436, in fastavro._read.read_union IndexError: list index out of range ----- so, how read the topic message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
