pratyakshsharma commented on a change in pull request #765: [WIP] Fix
KafkaAvroSource to use the latest schema
URL: https://github.com/apache/incubator-hudi/pull/765#discussion_r376978554
##########
File path:
hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceSchemaKafkaAvroDecoder.java
##########
@@ -0,0 +1,66 @@
+package com.uber.hoodie.utilities.sources;
+
+import com.uber.hoodie.common.util.TypedProperties;
+import com.uber.hoodie.utilities.UtilHelpers;
+import com.uber.hoodie.utilities.schema.SchemaProvider;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Properties;
+import kafka.serializer.Decoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.kafka.common.errors.SerializationException;
+
+/** A Kafka decoder that uses the source schema for read. */
+public class SourceSchemaKafkaAvroDecoder extends AbstractKafkaAvroDeserializer
+ implements Decoder<Object> {
+
+ private static final String SCHEMA_PROVIDER_CLASS_PROP =
"hoodie.deltastreamer.schemaprovider.class";
+
+ private final Schema sourceSchema;
+
+ public SourceSchemaKafkaAvroDecoder(VerifiableProperties props) {
+ this.configure(new KafkaAvroDeserializerConfig(props.props()));
+
+ TypedProperties typedProperties = new TypedProperties();
+ copyProperties(typedProperties, props.props());
+
+ try {
+ SchemaProvider schemaProvider =
+ UtilHelpers.createSchemaProvider(
+ props.getString(SCHEMA_PROVIDER_CLASS_PROP), typedProperties);
+ sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object fromBytes(byte[] bytes) {
+ return deserialize(bytes);
+ }
+
+ @Override
+ protected Object deserialize(
+ boolean includeSchemaAndVersion,
+ String topic,
+ Boolean isKey,
+ byte[] payload,
+ Schema readerSchema)
+ throws SerializationException {
+ if (readerSchema != null) {
+ return super.deserialize(includeSchemaAndVersion, topic, isKey, payload,
readerSchema);
+ }
+
+ return super.deserialize(includeSchemaAndVersion, topic, isKey, payload,
sourceSchema);
Review comment:
@haiminh87 I was thinking if we could make this configurable in sense that
have a boolean like readUsingLatestSchema with a default value of true and can
be overridden via TypedProperties instance.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services