sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r497257812
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
}
}
}
+
+ /**
+ * Remove namespace from fixed field.
+ * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds
namespace to fixed avro field
+ *
https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+ * So, we need to remove that namespace so that reader schema without
namespace do not throw erorr like this one
+ * org.apache.avro.AvroTypeException: Found
hoodie.source.hoodie_source.height.fixed, expecting fixed
+ *
+ * @param schema Schema from which namespace needs to be removed for fixed
fields
+ * @return input schema with namespace removed for fixed fields, if any
+ */
+ def removeNamespaceFromFixedFields(schema: Schema): Schema ={
Review comment:
@bvaradar
In delta streamer, currently, we have below three flows
1) [No
transformation](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335)
2) [Transformation with
userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L315)
3) [Transformation without
userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L323)
Only schema converted from spark data type to avro schema has this namespace
added to fixed fields. In delta streamer, currently we use user provided schema
([userProvidedSchemaProvider.targetSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L568))
to convert [bytes to
avro](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L126),
except for thrid flow (Transformation without userProvidedSchema). In such
case, we [derive
schema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L327)
from spark data type. So, backward compatible issue arises only when we use
transformer and no user provided schema.
Below is example of avro fixed field with and without namespace.
`{"name":"height","type":{"type":"fixed","name":"fixed","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
`{"name":"height","type":{"type":"fixed","name":"fixed","namespace":"hoodie.source.hoodie_source.height","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
Both of these result in same parquet schema
`required fixed_len_byte_array(5) height (DECIMAL(10,6));`
As we can see here, namespace in fixed field does not seem to have any
impact on parquet schema. So, may be HoodieFileReader in MergeHelper file you
referred shouldn't have any issue?
In general, it looks parquet file in existing hudi dataset would not have
issue. So, we could rule out issue in COPY ON WRITE table type. But in case of
MERGE ON READ table, I could see issue for thrid flow (Transformation without
userProvidedSchema). Below is the stack trace.
```
51511 [Executor task launch worker for task 502] ERROR
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner - Got
exception when reading log file
org.apache.avro.AvroTypeException: Found
hoodie.source.hoodie_source.height.fixed, expecting fixed
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
at
org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
at
org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
at
org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
at
org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
at
org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
at
org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
at
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
at
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
In summary, it looks third flow (Transformation without userProvidedSchema)
produces different output schema in log file when compared to two other flows
if there are fixed fields and this means if we want to change from thrid flow
to say first flow (by removing transformation), then we already have problem
since log files in MERGE ON READ table will have different schema, if there are
fixed fields. This PR may cause backward compatible issue for thrid flow but
would make sure, we produce same schema regardless of which flow we use.
Incase if you have better suggestion to make this work without causing issue
in existing dataset for third flow, please let me know, happy to update PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]