[
https://issues.apache.org/jira/browse/HUDI-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brandon Dahler updated HUDI-7388:
---------------------------------
Affects Version/s: 1.0.0
> [Regression] Records with a field of logical type decimal can no longer be
> ingested via HoodieStreamer
> ------------------------------------------------------------------------------------------------------
>
> Key: HUDI-7388
> URL: https://issues.apache.org/jira/browse/HUDI-7388
> Project: Apache Hudi
> Issue Type: Bug
> Components: deltastreamer
> Affects Versions: 1.0.0-beta1, 0.14.1, 0.15.0, 1.0.0-beta2, 1.0.0
> Reporter: Brandon Dahler
> Priority: Major
> Fix For: 0.16.0
>
> Attachments: decimal-repro.properties, schema.avsc, spark.log
>
>
> h2. Problem
> When attempting to ingest record with an Avro target schema which includes a
> field that uses the
> [decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type
> in Hudi 0.14.1, an exception is thrown:
> {code:java}
> 24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot
> support rewrite value for schema type:
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
> since the old schema type is:
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
> at
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
> at
> org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> at org.apache.spark.scheduler.Task.run(Task.scala:139)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value
> for schema type:
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
> since the old schema type is:
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
> at
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
> at
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
> at
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
> ... 21 more {code}
> h2. Reproduction Steps
> 1. Setup clean spark install
> {code:java}
> mkdir /tmp/hudi-decimal-repro
> cd /tmp/hudi-decimal-repro
> tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
> 2. Create a minimal schema file based on [the demo
> schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
> The only change is the {{type}} of the field named {{{}low{}}}.
> {code:java}
> echo '{
> "type":"record",
> "name":"stock_ticks",
> "fields":[{
> "name": "volume",
> "type": "long"
> }, {
> "name": "ts",
> "type": "string"
> }, {
> "name": "symbol",
> "type": "string"
> },{
> "name": "year",
> "type": "int"
> },{
> "name": "month",
> "type": "string"
> },{
> "name": "high",
> "type": "double"
> },{
> "name": "low",
> "type": {
> "type": "bytes",
> "logicalType": "decimal",
> "precision": 4,
> "scale": 2
> }
> },{
> "name": "key",
> "type": "string"
> },{
> "name": "date",
> "type":"string"
> }, {
> "name": "close",
> "type": "double"
> }, {
> "name": "open",
> "type": "double"
> }, {
> "name": "day",
> "type":"string"
> }
> ]}' > schema.avsc{code}
> 3. Create a minimal properties file
> {code:java}
> echo "hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=date
> hoodie.table.recordkey.fields=key
> hoodie.table.partition.fields=date
> hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" >
> decimal-repro.properties{code}
> 4. Copy data file from the docker demo
> {code:java}
> mkdir data
> cd data
> wget
> https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
>
> cd ..{code}
> 5. Run HoodieStreamer
> {code:java}
> spark-3.4.2-bin-hadoop3/bin/spark-submit \
> --packages
> org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
> \
> --conf spark.kryoserializer.buffer.max=200m \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --class org.apache.hudi.utilities.streamer.HoodieStreamer \
> spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
> --table-type COPY_ON_WRITE \
> --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
> --target-base-path /tmp/hudi-decimal-repro/table \
> --target-table table \
> --props /tmp/hudi-decimal-repro/decimal-repro.properties \
> --schemaprovider-class
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
> h3. Expected Results
> Command runs successfully, data is ingested successfully into
> {{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under
> {{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
> h3. Actual Results
> Command fails with exception, no data is ingsted into the table. Table left
> with a hanging commit at the requested state.
> Logs of the attempted run are attached as spark.log
> h2. Additional Information
> This issue does not appear to exist in versions 0.12.2 through 0.14.0 based
> on my own testing. It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, and
> 1.0.0-beta2 releases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)