sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r585053400
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<?>
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+ } else {
+ return org.apache.pulsar.client.api.Schema.BYTES;
+ }
+ }
+
+ public static class NoCopyKafkaAvroDeserializer extends
KafkaAvroDeserializer {
+
+ private final PulsarSchemaCache<GenericRecord> schemaCache = new
PulsarSchemaCache<>();
+
+ @Override
+ protected Object deserialize(boolean includeSchemaAndVersion, String
topic, Boolean isKey, byte[] payload, Schema readerSchema) throws
SerializationException {
+ if (payload == null) {
+ return null;
+ } else {
+ int id = -1;
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(payload);
+ buffer.get(); // magic number
+ id = buffer.getInt();
+ String subject = getSubjectName(topic, isKey != null ?
isKey : false);
+ Schema schema =
this.schemaRegistry.getBySubjectAndId(subject, id);
Review comment:
Why not implement `Source<ByteBuf>` or `Source<ByteBuffer>`?
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<?>
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
Review comment:
The deserializer is constructed by reflection. There is a lot of
unknowns on how Kafka would construct the deserializer. Hence I would suggest
to keep the deserializer as simple as possible. What you need is just to get
the schema id and the remaining content. See example at
https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/serde/KafkaSchemaAndBytesDeserializer.java
You don't need to pass the cache into the deserializer. You can implement
the logic of extracting schema in `extractSchema` method. So the schema cache
can be maintained in the connector, and then also you can get the schema
information in the same pulsar thread rather than hiding the logic deeply into
a deserializer where you don't know where the deserialize method will be run.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]