dennisylyung edited a comment on issue #6034: [Pulsar IO][Issue 5633]Support 
avro schema for debezium connector
URL: https://github.com/apache/pulsar/pull/6034#issuecomment-603635083
 
 
   I have tried to build and run it. 
   When I use JsonConverter ("org.apache.kafka.connect.json.JsonConverter")
   There will be an error of:
   ```
   12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  
io.debezium.connector.mysql.SnapshotReader - Step 9: scanned 40 rows in 1 
tables in 00:00:00.228
   12:29:55.891 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  
io.debezium.connector.mysql.SnapshotReader - Step 10: committing transaction
   12:29:55.939 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  
io.debezium.connector.mysql.SnapshotReader - Step 11: releasing table read 
locks to enable MySQL writes
   12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  
io.debezium.connector.mysql.SnapshotReader - Writes to MySQL prevented for a 
total of 00:00:02.03
   12:29:56.044 [debezium-mysqlconnector-ztoreSalesDb-snapshot] INFO  
io.debezium.connector.mysql.SnapshotReader - Completed snapshot in 00:00:03.829
   12:29:56.095 [pulsar-client-io-1-1] INFO  
org.apache.pulsar.client.impl.ProducerImpl - 
[ztore-data/debezium-local/ztoreSalesDb] [null] Creating producer on cnx [id: 
0x7d5e380e, L:/172.27.240.97:60775 - R:pulsar.data.ztore.com/34.71.116.218:6650]
   12:29:56.336 [pulsar-client-io-1-1] INFO  
org.apache.pulsar.client.impl.ProducerImpl - 
[ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] Created producer on 
cnx [id: 0x7d5e380e, L:/172.27.240.97:60775 - 
R:pulsar.data.ztore.com/34.71.116.218:6650]
   12:29:56.634 [pulsar-client-io-1-1] WARN  
org.apache.pulsar.client.impl.ProducerImpl - 
[ztore-data/debezium-local/ztoreSalesDb] [pulsar-117-17] GetOrCreateSchema error
   
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: 
Key schemas or Value schemas are different schema type, from key schema type is 
BYTES and to key schema is JSON, from value schema is BYTES and to value schema 
is JSON
           at 
org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:997)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
           at 
org.apache.pulsar.client.impl.ClientCnx.lambda$sendGetOrCreateSchema$22(ClientCnx.java:839)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
           at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) 
~[?:1.8.0_221]
           at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_221]
           at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
[?:1.8.0_221]
           at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
[?:1.8.0_221]
           at 
org.apache.pulsar.client.impl.ClientCnx.handleGetOrCreateSchemaResponse(ClientCnx.java:733)
 [pulsar-client-original.jar:2.6.0-SNAPSHOT]
           at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:329)
 [pulsar-common.jar:2.6.0-SNAPSHOT]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
 [netty-codec-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300)
 [netty-codec-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) 
[netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
 [netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) 
[netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) 
[netty-transport-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
 [netty-common-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
[netty-common-4.1.43.Final.jar:4.1.43.Final]
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.43.Final.jar:4.1.43.Final]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   ```
   I have made sure to delete all the existing schemas before running so the 
schema mutation error is not stem from leftover schema from previous runs.
   
   And if I use AvroConverter 
(org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter)
   There will be an error of:
   ```
   12:24:17,895 INFO [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] Threads - Requested thread factory for connector MySqlConnector, 
id = ztoreSalesDb named = binlog-client
   12:24:17,907 INFO [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] Threads - Requested thread factory for connector MySqlConnector, 
id = ztoreSalesDb named = snapshot
   12:24:17,908 INFO [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] Threads - Creating thread 
debezium-mysqlconnector-ztoreSalesDb-snapshot
   12:24:22,622 INFO [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] AvroDataConfig - AvroDataConfig values: 
        connect.meta.data = true
        enhanced.avro.schema.support = false
        schemas.cache.config = 1000
   
   12:24:24,488 INFO [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] JavaInstanceRunnable - Encountered exception in sink write: 
   org.apache.kafka.common.errors.SerializationException: 
com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, 
code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
   Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character 
((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between 
tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
        at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) 
~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231)
 ~[jackson-databind-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) 
~[jackson-databind-2.10.1.jar:2.10.1]
        at 
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45)
 ~[?:?]
        at 
org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90)
 ~[?:?]
        at 
org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43)
 ~[?:?]
        at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) 
~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) 
~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444)
 [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427)
 [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282)
 [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   12:24:24,499 WARN [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] JavaInstanceRunnable - Failed to process result of message 
org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord@37912f1e
   java.lang.RuntimeException: 
org.apache.kafka.common.errors.SerializationException: 
com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, 
code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:448)
 ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:427)
 ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:282)
 [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   Caused by: org.apache.kafka.common.errors.SerializationException: 
com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, 
code 0)): only regular white space (\r, \n, \t) is allowed between tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
   Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character 
((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between 
tokens
    at [Source: (byte[])"�����"; line: 1, column: 2]
        at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) 
~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:690)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2373)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:672)
 ~[jackson-core-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4231)
 ~[jackson-databind-2.10.1.jar:2.10.1]
        at 
com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2725) 
~[jackson-databind-2.10.1.jar:2.10.1]
        at 
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:45)
 ~[?:?]
        at 
org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:90)
 ~[?:?]
        at 
org.apache.pulsar.io.kafka.connect.schema.KafkaSchema.encode(KafkaSchema.java:43)
 ~[?:?]
        at org.apache.pulsar.common.schema.KeyValue.encode(KeyValue.java:99) 
~[pulsar-client-api.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:128)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.client.impl.schema.KeyValueSchema.encode(KeyValueSchema.java:37)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.value(TypedMessageBuilderImpl.java:155)
 ~[pulsar-client-original.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:297) 
~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:444)
 ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        ... 3 more
   12:24:24,501 INFO [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] JavaInstanceRunnable - Encountered exception in source read: 
   java.util.concurrent.ExecutionException: java.lang.Exception: Sink Error
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_221]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_221]
        at 
org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165)
 ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460)
 [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246)
 [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   Caused by: java.lang.Exception: Sink Error
        at 
org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322)
 ~[pulsar-io-kafka-connect-adaptor-2.6.0-SNAPSHOT.jar:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285)
 ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        ... 1 more
   12:24:24,502 ERROR [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] JavaInstanceRunnable - 
[ztore-data/debezium-local/debezium-mysql-source:0] Uncaught exception in Java 
Instance
   java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
java.lang.Exception: Sink Error
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:464)
 ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246)
 [pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
   Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: 
Sink Error
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_221]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_221]
        at 
org.apache.pulsar.io.kafka.connect.KafkaConnectSource.read(KafkaConnectSource.java:165)
 ~[?:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:460)
 ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        ... 2 more
   Caused by: java.lang.Exception: Sink Error
        at 
org.apache.pulsar.io.kafka.connect.KafkaConnectSource$KafkaSourceRecord.fail(KafkaConnectSource.java:322)
 ~[?:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:285)
 ~[pulsar-functions-instance.jar:2.6.0-SNAPSHOT]
        ... 1 more
   12:24:24,503 INFO [ztore-data/debezium-local/debezium-mysql-source-0] 
[instance: 0] JavaInstanceRunnable - Closing instance
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to