fewrwee2 opened a new issue #11873:
URL: https://github.com/apache/pulsar/issues/11873


   #### Expected behavior
   
   期望mysql到pulsar的source,pulsar到mysql的sink可正常实时同步数据
   
   #### Actual behavior
   
   创建mysql到pulsar的source,pulsar到mysql的sink时,使用avro格式source日志中报错,报错如下:
   15:07:09.706 [public/default/mysql-avro-source-0] INFO  
org.apache.pulsar.functions.sink.PulsarSink - Opening pulsar sink with config: 
PulsarSinkConfig(processingGuarantees=ATLEAST_ONCE, 
topic=dbserver1.test_pulsar.student, serdeClassName=null, schemaType=avro, 
schemaProperties={}, typeClassName=org.apache.pulsar.common.schema.KeyValue, 
forwardSourceMessageProperty=true, 
producerConfig=ProducerConfig(maxPendingMessages=0, 
maxPendingMessagesAcrossPartitions=0, useThreadLocalProducers=false, 
cryptoConfig=null, batchBuilder=null))
   15:07:09.892 [public/default/mysql-avro-source-0] ERROR 
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Sink open produced 
uncaught exception: 
   org.apache.avro.AvroTypeException: Unknown type: K
           at 
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:410) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:663) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:759) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:86)
 ~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:615) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:335) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:332) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at java.lang.ClassValue.getFromHashMap(ClassValue.java:227) 
~[?:1.8.0_181]
           at java.lang.ClassValue.getFromBackup(ClassValue.java:209) 
~[?:1.8.0_181]
           at java.lang.ClassValue.get(ClassValue.java:115) ~[?:1.8.0_181]
           at 
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.pulsar.client.impl.schema.util.SchemaUtil.extractAvroSchema(SchemaUtil.java:91)
 ~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.client.impl.schema.util.SchemaUtil.createAvroSchema(SchemaUtil.java:78)
 ~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo(SchemaUtil.java:51)
 ~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.client.impl.schema.AvroSchema.of(AvroSchema.java:93) 
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:160)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:212)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:237)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$1(TopicSchema.java:73)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at java.util.HashMap.computeIfAbsent(HashMap.java:1127) 
~[?:1.8.0_181]
           at 
org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:73) 
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.sink.PulsarSink.initializeSchema(PulsarSink.java:403)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:324) 
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:782)
 [org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:212)
 [org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:234)
 [org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
   15:07:09.895 [public/default/mysql-avro-source-0] ERROR 
org.apache.pulsar.functions.instance.JavaInstanceRunnable - 
[public/default/mysql-avro-source:0] Uncaught exception in Java Instance
   org.apache.avro.AvroTypeException: Unknown type: K
           at 
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:410) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:663) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:759) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:86)
 ~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:615) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:335) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:332) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at java.lang.ClassValue.getFromHashMap(ClassValue.java:227) 
~[?:1.8.0_181]
           at java.lang.ClassValue.getFromBackup(ClassValue.java:209) 
~[?:1.8.0_181]
           at java.lang.ClassValue.get(ClassValue.java:115) ~[?:1.8.0_181]
           at 
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346) 
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
           at 
org.apache.pulsar.client.impl.schema.util.SchemaUtil.extractAvroSchema(SchemaUtil.java:91)
 ~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.client.impl.schema.util.SchemaUtil.createAvroSchema(SchemaUtil.java:78)
 ~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo(SchemaUtil.java:51)
 ~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.client.impl.schema.AvroSchema.of(AvroSchema.java:93) 
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:160)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:212)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:237)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$1(TopicSchema.java:73)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at java.util.HashMap.computeIfAbsent(HashMap.java:1127) 
~[?:1.8.0_181]
           at 
org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:73) 
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.sink.PulsarSink.initializeSchema(PulsarSink.java:403)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:324) 
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:782)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:212)
 ~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
           at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:234)
 [org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
   15:07:09.900 [public/default/mysql-avro-source-0] INFO  
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
   15:07:09.903 [public/default/mysql-avro-source-0] INFO  
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Unloading JAR files 
for function InstanceConfig(instanceId=0, 
functionId=a9eb4705-ef45-4449-b9ce-5521fe940bac, 
functionVersion=8ce3c1fc-e63e-4da2-993f-2b9fe4530f2b, functionDetails=tenant: 
"public"
   namespace: "default"
   name: "mysql-avro-source"
   className: "org.apache.pulsar.functions.api.utils.IdentityFunction"
   autoAck: true
   parallelism: 1
   source {
     className: "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource"
     configs: 
"{\"database.user\":\"root\",\"database.server.id\":\"184054\",\"database.server.name\":\"dbserver1\",\"database.port\":\"3306\",\"database.hostname\":\"192.168.1.111\",\"database.password\":\"dk123456\",\"offset.storage.topic\":\"offset-topic\",\"value.converter\":\"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter\",\"database.whitelist\":\"test_pulsar\",\"key.converter\":\"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter\",\"database.history\":\"org.apache.pulsar.io.debezium.PulsarDatabaseHistory\",\"pulsar.service.url\":\"pulsar://192.168.1.121:6650\",\"database.history.pulsar.topic\":\"history-topic\"}"
     typeClassName: "org.apache.pulsar.common.schema.KeyValue"
   }
   sink {
     topic: "dbserver1.test_pulsar.student"
     typeClassName: "org.apache.pulsar.common.schema.KeyValue"
     schemaType: "avro"
     forwardSourceMessageProperty: true
   }
   resources {
     cpu: 1.0
     ram: 1073741824
     disk: 10737418240
   }
   componentType: SOURCE
   
   #### Steps to reproduce
   
   How can we reproduce the issue
   
   #### System configuration
   pulsar版本:2.7.2
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to