sijie commented on a change in pull request #6034: [Pulsar IO][Issue 
5633]Support avro schema for debezium connector
URL: https://github.com/apache/pulsar/pull/6034#discussion_r376143520
 
 

 ##########
 File path: 
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
 ##########
 @@ -188,15 +198,48 @@ public void close() {
         @Getter
         Optional<String> destinationTopic;
 
+        private final AvroData avroData;
+
+        private org.apache.pulsar.kafka.shade.avro.Schema keyAvroSchema;
+
+        private org.apache.pulsar.kafka.shade.avro.Schema valueAvroSchema;
+
+        private final KafkaSchema keySchema;
+
+        private final KafkaSchema valueSchema;
+
+        private byte[] keyBytes;
+
+        private byte[] valueBytes;
+
         KafkaSourceRecord(SourceRecord srcRecord) {
-            byte[] keyBytes = keyConverter.fromConnectData(
+            keyBytes = keyConverter.fromConnectData(
                 srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
-            byte[] valueBytes = valueConverter.fromConnectData(
+            valueBytes = valueConverter.fromConnectData(
                 srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
+            this.avroData = new AvroData(1000);
             this.key = keyBytes != null ? 
Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
             this.value = new KeyValue(keyBytes, valueBytes);
 
             this.topicName = Optional.of(srcRecord.topic());
+            String keyName = this.topicName.get() + "-key";
+            String valueName = this.topicName.get() + "-value";
+
+            if (readerCache.getIfPresent(keyName) == null
 
 Review comment:
   actually I think the cache here is incorrect. you need a cache from kafka 
connect schema to pulsar schema.
   
   ```
   Cache<org.apache.kafka.connect.data.Schema, KafkaSchema>
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to