sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585053400



##########
File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081";);
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+
+        String currentValue = 
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && 
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> 
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+        } else {
+            return org.apache.pulsar.client.api.Schema.BYTES;
+        }
+    }
+
+    public static class NoCopyKafkaAvroDeserializer extends 
KafkaAvroDeserializer {
+
+        private final PulsarSchemaCache<GenericRecord> schemaCache = new 
PulsarSchemaCache<>();
+
+        @Override
+        protected Object deserialize(boolean includeSchemaAndVersion, String 
topic, Boolean isKey, byte[] payload, Schema readerSchema) throws 
SerializationException {
+            if (payload == null) {
+                return null;
+            } else {
+                int id = -1;
+                try {
+                    ByteBuffer buffer = ByteBuffer.wrap(payload);
+                    buffer.get(); // magic number
+                    id = buffer.getInt();
+                    String subject = getSubjectName(topic, isKey != null ? 
isKey : false);
+                    Schema schema = 
this.schemaRegistry.getBySubjectAndId(subject, id);

Review comment:
       Why not implement `Source<ByteBuf>` or `Source<ByteBuffer>`?

##########
File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
 
     @Override
     protected Properties beforeCreateConsumer(Properties props) {
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.putIfAbsent("schema.registry.url", "http://localhost:8081";);
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+
+        String currentValue = 
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        // replace KafkaAvroDeserializer with our custom implementation
+        if (currentValue != null && 
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NoCopyKafkaAvroDeserializer.class.getName());
+        }
+
         log.info("Created kafka consumer config : {}", props);
         return props;
     }
 
     @Override
-    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
-        return record.value();
+    public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getValue();
+        }
+        return value;
+    }
+
+    @Override
+    public org.apache.pulsar.client.api.Schema<?> 
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+        Object value = consumerRecord.value();
+        if (value instanceof BytesWithAvroPulsarSchema) {
+            return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();

Review comment:
       The deserializer is constructed by reflection. There is a lot of 
unknowns on how Kafka would construct the deserializer. Hence I would suggest 
to keep the deserializer as simple as possible. What you need is just to get 
the schema id and the remaining content. See example at 
https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/serde/KafkaSchemaAndBytesDeserializer.java
   
   You don't need to pass the cache into the deserializer. You can implement 
the logic of extracting schema in `extractSchema` method. So the schema cache 
can be maintained in the connector, and then also you can get the schema 
information in the same pulsar thread rather than hiding the logic deeply into 
a deserializer where you don't know where the deserialize method will be run.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to