This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c3a743  [pulsar-io] Fix invalid topic name generation in 
kafka-source-connector (#9035)
1c3a743 is described below

commit 1c3a743749924c7c2b7d865565d1fb3c548511c5
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Jan 10 19:24:36 2021 -0800

    [pulsar-io] Fix invalid topic name generation in kafka-source-connector 
(#9035)
    
    ### Motivation
    
    Right now, kafka-source-connector creates invalid topic name which causes 
error while creating producer in debezium io-source.
    
    ```
    21:22:41,772 DEBUG [my-property/us-west/my-ns/debezium-postgres-source-0] 
[instance: 0] JavaInstance - Got result: object: (key = "[B@3bbd472c", value = 
"[B@1aa204ba")
    21:22:41,780 ERROR [my-property/us-west/my-ns/debezium-postgres-source-0] 
[instance: 0] PulsarSink - Failed to create Producer while doing user publish
    
org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: 
Invalid topic name: 'my-property/us-west/my-ns/topic1'
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892)
 ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93)
 ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107)
 ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
 [?:?]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) 
[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 [?:?]
        at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
 [?:?]
        at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
 [?:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at java.lang.Thread.run(Thread.java:834) [?:?]
    21:22:41,781 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] 
[instance: 0] JavaInstanceRunnable - Encountered exception in sink write:
    java.lang.RuntimeException: 
org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: 
Invalid topic name: 'my-property/us-west/my-ns/topic1'
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:124)
 ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
 ~[?:?]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115)
 ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111)
 ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184)
 ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) 
~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 [?:?]
        at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
 [?:?]
        at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
 [?:?]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283)
 [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at java.lang.Thread.run(Thread.java:834) [?:?]
    Caused by: 
org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: 
Invalid topic name: 'my-property/us-west/my-ns/topic1'
        at 
org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892)
 ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93)
 ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107)
 ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        at 
org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117)
 ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
        ... 13 more
    21:22:41,790 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] 
[instance: 0] AvroDataConfig - AvroDataConfig values:
        schemas.cache.config = 1000
        enhanced.avro.schema.support = false
        connect.meta.data = true
    ```
---
 .../org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 38f6a70..5a6a1a6 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -189,7 +189,7 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
         KafkaSchemaWrappedSchema valueSchema;
 
         AbstractKafkaSourceRecord(SourceRecord srcRecord) {
-            this.destinationTopic = Optional.of(topicNamespace + "/" + 
srcRecord.topic());
+            this.destinationTopic = Optional.of("persistent://"+topicNamespace 
+ "/" + srcRecord.topic());
         }
 
         @Override

Reply via email to