dennisylyung edited a comment on issue #6034: [Pulsar IO][Issue 5633]Support avro schema for debezium connector URL: https://github.com/apache/pulsar/pull/6034#issuecomment-603635083 I have tried to build and run it. When I use JsonConverter ("org.apache.kafka.connect.json.JsonConverter") There will be an error of: ``` 12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 9: scanned 40 rows in 1 tables in 00:00:00.228 12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 10: committing transaction 12:29:55.939 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Step 11: releasing table read locks to enable MySQL writes 12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Writes to MySQL prevented for a total of 00:00:02.03 12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO io.debezium.connector.mysql.SnapshotReader - Completed snapshot in 00:00:03.829 12:29:56.095 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [null] Creating producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650] 12:29:56.336 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] Created producer on cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650] 12:29:56.634 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ProducerImpl - [ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] GetOrCreateSchema error org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Key schemas or Value schemas are different schema type, from key schema type is BYTES and to key schema is JSON, from value schema is BYTES and to value schema is JSON at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:997) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.client.impl.ClientCnx.lambda$sendGetOrCreateSchema$22(ClientCnx.java:839) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_221] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_221] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) [?:1.8.0_221] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) [?:1.8.0_221] at org.apache.pulsar.client.impl.ClientCnx.handleGetOrCreateSchemaResponse(ClientCnx.java:733) [pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:329) [pulsar-common.jar:2.6.0-SNAPSHOT] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) [netty-codec-4.1.43.Final.jar:4.1.43.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) [netty-codec-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) [netty-transport-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) [netty-common-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.43.Final.jar:4.1.43.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.43.Final.jar:4.1.43.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] ``` I have made sure to delete all the existing schemas before running so the schema mutation error is not stem from leftover schema from previous runs. And if I use AvroConverter (org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter) There will be an error of: ``` 12:24:17,895 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = binlog-client 12:24:17,907 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Requested thread factory for connector MySqlConnector, id = ztoreSalesDb named = snapshot 12:24:17,908 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] Threads - Creating thread debezium-mysqlconnector-ztoreSalesDb-snapshot 12:24:22,622 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values: connect.meta.data = true enhanced.avro.schema.support = false schemas.cache.config = 1000 12:24:24,488 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: (byte[])"�����"; line: 1, column: 2] Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: (byte[])"�����"; line: 1, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1] at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1] at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?] at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?] at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?] at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] 12:24:24,499 WARN [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Failed to process result of message org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord@37912f1e java.lang.RuntimeException: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: (byte[])"�����"; line: 1, column: 2] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:448) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: (byte[])"�����"; line: 1, column: 2] Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: (byte[])"�����"; line: 1, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672) ~[jackson-core-2.10.1.jar:2.10.1] at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231) ~[jackson-databind-2.10.1.jar:2.10.1] at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) ~[jackson-databind-2.10.1.jar:2.10.1] at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45) ~[?:?] at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90) ~[?:?] at org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43) ~[?:?] at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) ~[pulsar-client-api.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155) ~[pulsar-client-original.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] ... 3 more 12:24:24,501 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in source read: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221] at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: java.lang.Exception: Sink Error at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] ... 1 more 12:24:24,502 ERROR [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - [ztore-data/debezium-local/debezium-mysql-source:0] Uncaught exception in Java Instance java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:464) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [pulsar-functions-instance.jar:2.6.0-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_221] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_221] at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165) ~[?:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] ... 2 more Caused by: java.lang.Exception: Sink Error at org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322) ~[?:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285) ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT] ... 1 more 12:24:24,503 INFO [ztore-data/debezium-local/debezium-mysql-source-0] [instance: 0] JavaInstanceRunnable - Closing instance ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
