fewrwee2 opened a new issue #11873:
URL: https://github.com/apache/pulsar/issues/11873
#### Expected behavior
期望mysql到pulsar的source,pulsar到mysql的sink可正常实时同步数据
#### Actual behavior
创建mysql到pulsar的source,pulsar到mysql的sink时,使用avro格式source日志中报错,报错如下:
15:07:09.706 [public/default/mysql-avro-source-0] INFO
org.apache.pulsar.functions.sink.PulsarSink - Opening pulsar sink with config:
PulsarSinkConfig(processingGuarantees=ATLEAST_ONCE,
topic=dbserver1.test_pulsar.student, serdeClassName=null, schemaType=avro,
schemaProperties={}, typeClassName=org.apache.pulsar.common.schema.KeyValue,
forwardSourceMessageProperty=true,
producerConfig=ProducerConfig(maxPendingMessages=0,
maxPendingMessagesAcrossPartitions=0, useThreadLocalProducers=false,
cryptoConfig=null, batchBuilder=null))
15:07:09.892 [public/default/mysql-avro-source-0] ERROR
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Sink open produced
uncaught exception:
org.apache.avro.AvroTypeException: Unknown type: K
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:410)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:663)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:759)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:86)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:615)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:335)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:332)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at java.lang.ClassValue.getFromHashMap(ClassValue.java:227)
~[?:1.8.0_181]
at java.lang.ClassValue.getFromBackup(ClassValue.java:209)
~[?:1.8.0_181]
at java.lang.ClassValue.get(ClassValue.java:115) ~[?:1.8.0_181]
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.pulsar.client.impl.schema.util.SchemaUtil.extractAvroSchema(SchemaUtil.java:91)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.client.impl.schema.util.SchemaUtil.createAvroSchema(SchemaUtil.java:78)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo(SchemaUtil.java:51)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.client.impl.schema.AvroSchema.of(AvroSchema.java:93)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:160)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:212)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:237)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$1(TopicSchema.java:73)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
~[?:1.8.0_181]
at
org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:73)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.sink.PulsarSink.initializeSchema(PulsarSink.java:403)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:324)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:782)
[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:212)
[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:234)
[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
15:07:09.895 [public/default/mysql-avro-source-0] ERROR
org.apache.pulsar.functions.instance.JavaInstanceRunnable -
[public/default/mysql-avro-source:0] Uncaught exception in Java Instance
org.apache.avro.AvroTypeException: Unknown type: K
at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:410)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:663)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:759)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:86)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:615)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:335)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:332)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at java.lang.ClassValue.getFromHashMap(ClassValue.java:227)
~[?:1.8.0_181]
at java.lang.ClassValue.getFromBackup(ClassValue.java:209)
~[?:1.8.0_181]
at java.lang.ClassValue.get(ClassValue.java:115) ~[?:1.8.0_181]
at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:346)
~[org.apache.avro-avro-1.9.1.jar:1.9.1]
at
org.apache.pulsar.client.impl.schema.util.SchemaUtil.extractAvroSchema(SchemaUtil.java:91)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.client.impl.schema.util.SchemaUtil.createAvroSchema(SchemaUtil.java:78)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo(SchemaUtil.java:51)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.client.impl.schema.AvroSchema.of(AvroSchema.java:93)
~[org.apache.pulsar-pulsar-client-original-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:160)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:212)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:237)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$1(TopicSchema.java:73)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
~[?:1.8.0_181]
at
org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:73)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.sink.PulsarSink.initializeSchema(PulsarSink.java:403)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:324)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:2.7.2]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:782)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:212)
~[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:234)
[org.apache.pulsar-pulsar-functions-instance-2.7.2.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
15:07:09.900 [public/default/mysql-avro-source-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
15:07:09.903 [public/default/mysql-avro-source-0] INFO
org.apache.pulsar.functions.instance.JavaInstanceRunnable - Unloading JAR files
for function InstanceConfig(instanceId=0,
functionId=a9eb4705-ef45-4449-b9ce-5521fe940bac,
functionVersion=8ce3c1fc-e63e-4da2-993f-2b9fe4530f2b, functionDetails=tenant:
"public"
namespace: "default"
name: "mysql-avro-source"
className: "org.apache.pulsar.functions.api.utils.IdentityFunction"
autoAck: true
parallelism: 1
source {
className: "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource"
configs:
"{\"database.user\":\"root\",\"database.server.id\":\"184054\",\"database.server.name\":\"dbserver1\",\"database.port\":\"3306\",\"database.hostname\":\"192.168.1.111\",\"database.password\":\"dk123456\",\"offset.storage.topic\":\"offset-topic\",\"value.converter\":\"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter\",\"database.whitelist\":\"test_pulsar\",\"key.converter\":\"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter\",\"database.history\":\"org.apache.pulsar.io.debezium.PulsarDatabaseHistory\",\"pulsar.service.url\":\"pulsar://192.168.1.121:6650\",\"database.history.pulsar.topic\":\"history-topic\"}"
typeClassName: "org.apache.pulsar.common.schema.KeyValue"
}
sink {
topic: "dbserver1.test_pulsar.student"
typeClassName: "org.apache.pulsar.common.schema.KeyValue"
schemaType: "avro"
forwardSourceMessageProperty: true
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: SOURCE
#### Steps to reproduce
How can we reproduce the issue
#### System configuration
pulsar版本:2.7.2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]