codelipenghui commented on a change in pull request #10002:
URL: https://github.com/apache/pulsar/pull/10002#discussion_r600420453
##########
File path:
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
##########
@@ -31,85 +31,189 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.FloatDeserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.ShortDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
/**
- * Simple Kafka Source that just transfers the value part of the kafka records
- * as Strings
+ * Kafka Source that transfers the data from Kafka to Pulsar and sets the
Schema type properly.
+ * We use the key and the value deserializer in order to decide the type of
Schema to be set on the topic on Pulsar.
+ * In case of KafkaAvroDeserializer we use the Schema Registry to download
the schema and apply it to the topic.
+ * Please refer to {@link
#getSchemaFromDeserializerAndAdaptConfiguration(String, Properties, boolean)}
for the list
+ * of supported Deserializers.
+ * If you set StringDeserializer for the key then we use the raw key as key
for the Pulsar message.
+ * If you set another Deserializer for the key we use the KeyValue schema
type in Pulsar with the SEPARATED encoding.
+ * This way the Key is stored in the Pulsar key, encoded as base64 string and
with a Schema, the Value of the message
+ * is stored in the Pulsar value with a Schema.
+ * This way there is a one-to-one mapping between Kafka key/value pair and
the Pulsar data model.
*/
@Connector(
name = "kafka",
type = IOType.SOURCE,
- help = "The KafkaBytesSource is used for moving messages from Kafka to
Pulsar.",
+ help = "Transfer data from Kafka to Pulsar.",
configClass = KafkaSourceConfig.class
)
@Slf4j
public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
private AvroSchemaCache schemaCache;
-
- private static final Collection<String> SUPPORTED_KEY_DESERIALIZERS =
-
Collections.unmodifiableCollection(Arrays.asList(StringDeserializer.class.getName()));
-
- private static final Collection<String> SUPPORTED_VALUE_DESERIALIZERS =
-
Collections.unmodifiableCollection(Arrays.asList(ByteArrayDeserializer.class.getName(),
KafkaAvroDeserializer.class.getName()));
+ private Schema keySchema;
+ private Schema valueSchema;
+ private boolean produceKeyValue;
@Override
protected Properties beforeCreateConsumer(Properties props) {
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
log.info("Created kafka consumer config : {}", props);
- String currentKeyDeserializer =
props.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- if (!SUPPORTED_KEY_DESERIALIZERS.contains(currentKeyDeserializer)) {
- throw new IllegalArgumentException("Unsupported key deserializer:
" + currentKeyDeserializer + ", only " + SUPPORTED_KEY_DESERIALIZERS);
- }
+ keySchema =
getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
props, true);
+ valueSchema =
getSchemaFromDeserializerAndAdaptConfiguration(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
props, false);
+
+ boolean needsSchemaCache = keySchema ==
DeferredSchemaPlaceholder.INSTANCE
+ || valueSchema ==
DeferredSchemaPlaceholder.INSTANCE;
- String currentValueDeserializer =
props.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- if (!SUPPORTED_VALUE_DESERIALIZERS.contains(currentValueDeserializer))
{
- throw new IllegalArgumentException("Unsupported value
deserializer: " + currentValueDeserializer + ", only " +
SUPPORTED_VALUE_DESERIALIZERS);
+ if (needsSchemaCache) {
+ initSchemaCache(props);
}
- // replace KafkaAvroDeserializer with our custom implementation
- if (currentValueDeserializer != null &&
currentValueDeserializer.equals(KafkaAvroDeserializer.class.getName())) {
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ExtractKafkaAvroSchemaDeserializer.class.getName());
- KafkaAvroDeserializerConfig config = new
KafkaAvroDeserializerConfig(props);
- List<String> urls = config.getSchemaRegistryUrls();
- int maxSchemaObject = config.getMaxSchemasPerSubject();
- SchemaRegistryClient schemaRegistryClient = new
CachedSchemaRegistryClient(urls, maxSchemaObject);
- schemaCache = new AvroSchemaCache(schemaRegistryClient);
+ if (keySchema.getSchemaInfo().getType() != SchemaType.STRING) {
+ // if the Key is a String we can use native Pulsar Key
+ // otherwise we use KeyValue schema
+ // that allows you to set a schema for the Key and a schema for
the Value.
+ // using SEPARATED encoding the key is saved into the binary key
+ // so it is used for routing and for compaction
+ produceKeyValue = true;
}
+
return props;
}
+ private void initSchemaCache(Properties props) {
+ KafkaAvroDeserializerConfig config = new
KafkaAvroDeserializerConfig(props);
+ List<String> urls = config.getSchemaRegistryUrls();
+ int maxSchemaObject = config.getMaxSchemasPerSubject();
+ SchemaRegistryClient schemaRegistryClient = new
CachedSchemaRegistryClient(urls, maxSchemaObject);
+ log.info("initializing SchemaRegistry Client, urls:{},
maxSchemasPerSubject: {}", urls, maxSchemaObject);
+ schemaCache = new AvroSchemaCache(schemaRegistryClient);
+ }
+
@Override
- public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
- Object value = consumerRecord.value();
- if (value instanceof BytesWithKafkaSchema) {
- return ((BytesWithKafkaSchema) value).getValue();
- } else if (value instanceof byte[]) {
- return ByteBuffer.wrap((byte[]) value);
- } else if (value == null) {
+ public KafkaRecord buildRecord(ConsumerRecord<Object, Object>
consumerRecord) {
+ if (produceKeyValue) {
+ Object key = extractSimpleValue(consumerRecord.key());
+ Object value = extractSimpleValue(consumerRecord.value());
+ Schema currentKeySchema =
getSchemaFromObject(consumerRecord.key(), keySchema);
+ Schema currentValueSchema =
getSchemaFromObject(consumerRecord.value(), valueSchema);
+ return new KeyValueKafkaRecord(consumerRecord,
+ new KeyValue<>(key, value),
+ currentKeySchema,
+ currentValueSchema);
+
+ } else {
+ Object value = consumerRecord.value();
+ return new KafkaRecord(consumerRecord,
+ extractSimpleValue(value),
+ getSchemaFromObject(value, valueSchema));
+
+ }
+ }
+
+ private static ByteBuffer extractSimpleValue(Object value) {
+ // we have substituted the original Deserializer with
+ // ByteBufferDeserializer in order to save memory copies
+ // so here we can have only a ByteBuffer or at most a
+ // BytesWithKafkaSchema in case of ExtractKafkaAvroSchemaDeserializer
+ if (value == null) {
return null;
+ } else if (value instanceof BytesWithKafkaSchema) {
+ return ((BytesWithKafkaSchema) value).getValue();
+ } else if (value instanceof ByteBuffer) {
+ return (ByteBuffer) value;
} else {
- throw new UnsupportedOperationException("Cannot extract a value
from a " + value.getClass());
+ throw new IllegalArgumentException("Unexpected type from Kafka:
"+value.getClass());
}
}
- @Override
- public org.apache.pulsar.client.api.Schema<ByteBuffer>
extractSchema(ConsumerRecord<Object, Object> consumerRecord) {
- Object value = consumerRecord.value();
+ private Schema<ByteBuffer> getSchemaFromObject(Object value, Schema
fallback) {
if (value instanceof BytesWithKafkaSchema) {
+ // this is a Struct with schema downloaded by the schema registry
+ // the schema may be different from record to record
return schemaCache.get(((BytesWithKafkaSchema)
value).getSchemaId());
} else {
- return Schema.BYTEBUFFER;
+ return fallback;
+ }
+ }
+
+ private static Schema<ByteBuffer>
getSchemaFromDeserializerAndAdaptConfiguration(String key, Properties props,
boolean isKey) {
+ String kafkaDeserializerClass = props.getProperty(key);
+ Objects.requireNonNull(kafkaDeserializerClass);
+
+ // we want to simply transfer the bytes,
+ // by default we override the Kafka Consumer configuration
+ // to pass the original ByteBuffer
+ props.put(key, ByteBufferDeserializer.class.getCanonicalName());
+
+ Schema<?> result;
+ if
(ByteArrayDeserializer.class.getName().equals(kafkaDeserializerClass)
Review comment:
> So you feel strong that we should use the switch?
No, just from the syntax perspective, it works for me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]