vburenin commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583931865
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String
registryUrl) throws IOExcep
public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+ this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+ this.injectKafkaFieldSchema =
props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+ this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+ this.targetRegistryUrl =
config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+ this.noTargetSchema = targetRegistryUrl.equals("null");
}
- private static Schema getSchema(String registryUrl) throws IOException {
- return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+ private static Schema getSchema(String registryUrl, boolean
injectKafkaFieldSchema) throws IOException {
+ Schema schema = new
Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+ if (injectKafkaFieldSchema) {
+ return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+ }
+ return schema;
}
@Override
public Schema getSourceSchema() {
- String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+ if (cacheDisabled) {
+ return getSourceSchemaFromRegistry();
+ }
+ if (sourceSchema == null) {
+ synchronized (this) {
+ if (sourceSchema == null) {
+ sourceSchema = getSourceSchemaFromRegistry();
+ }
+ }
+ }
+ return sourceSchema;
+ }
+
+ @Override
+ public Schema getTargetSchema() {
+ if (noTargetSchema) {
+ return null;
+ }
+ if (cacheDisabled) {
+ return getTargetSchemaFromRegistry();
+ }
+ if (targetSchema == null) {
+ synchronized (this) {
+ if (targetSchema == null) {
+ targetSchema = getTargetSchemaFromRegistry();
+ }
+ }
+ }
+ return targetSchema;
+ }
+
+ private Schema getSourceSchemaFromRegistry() {
try {
- return getSchema(registryUrl);
+ return getSchema(registryUrl, injectKafkaFieldSchema);
Review comment:
Answering your first question:
1. That is correct. Actual data source doesn't have any meta fields unless
they are encoded by user somehow, these fields are extracted on Hudi side
directly from Kafka client and then injected into the deserialized object.
2. The the source schema used in deser class can and should have kafka meta
fields as they are optional and become a part of the record. Without the prior
schema modification I can't even call
'record.put(AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD, obj.offset());' in
a map function as it fails that such field doesn't exist in the schema.
Answering the last question, it is being called by Kafka Fetcher
(org.apache.kafka.clients.consumer.internals.Fetcher)
Actually, let see what the stack trace is...
```
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
at
org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at
org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1009)
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:96)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1186)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1500(Fetcher.java:1035)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:212)
at
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
at
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
at
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]