sijie commented on a change in pull request #9448:
URL: https://github.com/apache/pulsar/pull/9448#discussion_r577270066
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
Review comment:
If `KafkaBytesSource` is a connector for transfer bytes with different
schemas, why do we need to set a schema registry URL here? Because
`schema.registry.url` is only needed for AVRO schema.
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<?>
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+ } else {
+ return org.apache.pulsar.client.api.Schema.BYTES;
+ }
+ }
+
+ public static class NoCopyKafkaAvroDeserializer extends
KafkaAvroDeserializer {
+
+ private final PulsarSchemaCache<GenericRecord> schemaCache = new
PulsarSchemaCache<>();
+
+ @Override
+ protected Object deserialize(boolean includeSchemaAndVersion, String
topic, Boolean isKey, byte[] payload, Schema readerSchema) throws
SerializationException {
+ if (payload == null) {
+ return null;
+ } else {
+ int id = -1;
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(payload);
+ buffer.get(); // magic number
+ id = buffer.getInt();
+ String subject = getSubjectName(topic, isKey != null ?
isKey : false);
+ Schema schema =
this.schemaRegistry.getBySubjectAndId(subject, id);
Review comment:
This doesn't look correct to me. It will request schema from the schema
registry each time on deserializing the message.
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<?>
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+ } else {
+ return org.apache.pulsar.client.api.Schema.BYTES;
+ }
+ }
+
+ public static class NoCopyKafkaAvroDeserializer extends
KafkaAvroDeserializer {
+
+ private final PulsarSchemaCache<GenericRecord> schemaCache = new
PulsarSchemaCache<>();
+
+ @Override
+ protected Object deserialize(boolean includeSchemaAndVersion, String
topic, Boolean isKey, byte[] payload, Schema readerSchema) throws
SerializationException {
Review comment:
I think you can get a schemaId and a ByteBuffer just like what I did at
https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/serde/KafkaSchemaAndBytesDeserializer.java.
You can write a Schema wrapper to transfer the ByteBuffer like what I did at
https://github.com/streamnative/pulsar-io-kafka/blob/master/src/main/java/io/streamnative/connectors/kafka/schema/KafkaAvroSchema.java
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<?>
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+ } else {
+ return org.apache.pulsar.client.api.Schema.BYTES;
+ }
+ }
+
+ public static class NoCopyKafkaAvroDeserializer extends
KafkaAvroDeserializer {
+
+ private final PulsarSchemaCache<GenericRecord> schemaCache = new
PulsarSchemaCache<>();
+
+ @Override
+ protected Object deserialize(boolean includeSchemaAndVersion, String
topic, Boolean isKey, byte[] payload, Schema readerSchema) throws
SerializationException {
+ if (payload == null) {
+ return null;
+ } else {
+ int id = -1;
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(payload);
+ buffer.get(); // magic number
+ id = buffer.getInt();
+ String subject = getSubjectName(topic, isKey != null ?
isKey : false);
+ Schema schema =
this.schemaRegistry.getBySubjectAndId(subject, id);
Review comment:
I think getting request schema information should be done in the
`extractSchema` call, not in a deserializer.
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
Review comment:
Since we are refactoring the Kafka connector to support schema, we
should change the key to `Object` to allow supporting key schema in the future.
I understand you want to push this change in as early as you can. I am fine
with that. But we need to be super clear about what we are doing and how we can
extend it.
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<?>
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
+ } else {
+ return org.apache.pulsar.client.api.Schema.BYTES;
+ }
+ }
+
+ public static class NoCopyKafkaAvroDeserializer extends
KafkaAvroDeserializer {
+
+ private final PulsarSchemaCache<GenericRecord> schemaCache = new
PulsarSchemaCache<>();
+
+ @Override
+ protected Object deserialize(boolean includeSchemaAndVersion, String
topic, Boolean isKey, byte[] payload, Schema readerSchema) throws
SerializationException {
Review comment:
I am not sure we should deserialize the buffer and convert it to a
generic record. Because you are already parsing the buffer, we should just be
able to transfer the bytes. I will look back to see how we did in our Kafka
connector.
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
log.info("Created kafka consumer config : {}", props);
return props;
}
@Override
- public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
- return record.value();
+ public Object extractValue(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.Schema<?>
extractSchema(ConsumerRecord<String, Object> consumerRecord) {
+ Object value = consumerRecord.value();
+ if (value instanceof BytesWithAvroPulsarSchema) {
+ return ((BytesWithAvroPulsarSchema) value).getPulsarSchema();
Review comment:
PulsarAvroSchemaCache should be maintained at the connector level, not
in the deserializer.
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -43,14 +49,64 @@
@Override
protected Properties beforeCreateConsumer(Properties props) {
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.putIfAbsent("schema.registry.url", "http://localhost:8081");
+ props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+
+ String currentValue =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+ // replace KafkaAvroDeserializer with our custom implementation
+ if (currentValue != null &&
currentValue.equals(KafkaAvroDeserializer.class.getName())) {
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NoCopyKafkaAvroDeserializer.class.getName());
+ }
+
Review comment:
I think we should throw exceptions if the value deserializer is not
supported.
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/PulsarSchemaCache.java
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.IdentityHashMap;
+
+@Slf4j
+class PulsarSchemaCache<T> {
Review comment:
This is an AVRO schema cache. Not Pulsar schema cache.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]