[ 
https://issues.apache.org/jira/browse/HUDI-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon Dahler updated HUDI-7388:
---------------------------------
    Affects Version/s: 1.0.0

> [Regression] Records with a field of logical type decimal can no longer be 
> ingested via HoodieStreamer
> ------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-7388
>                 URL: https://issues.apache.org/jira/browse/HUDI-7388
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: deltastreamer
>    Affects Versions: 1.0.0-beta1, 0.14.1, 0.15.0, 1.0.0-beta2, 1.0.0
>            Reporter: Brandon Dahler
>            Priority: Major
>             Fix For: 0.16.0
>
>         Attachments: decimal-repro.properties, schema.avsc, spark.log
>
>
> h2. Problem
> When attempting to ingest record with an Avro target schema which includes a 
> field that uses the 
> [decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type 
> in Hudi 0.14.1, an exception is thrown:
> {code:java}
> 24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot 
> support rewrite value for schema type: 
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
>  since the old schema type is: 
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
>         at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
>         at 
> org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
>         at 
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
>         at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
>         at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
>         at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>         at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>         at org.apache.spark.scheduler.Task.run(Task.scala:139)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>         at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value 
> for schema type: 
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
>  since the old schema type is: 
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
>         at 
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
>         at 
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
>         at 
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
>         at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
>         ... 21 more {code}
> h2. Reproduction Steps
> 1. Setup clean spark install
> {code:java}
> mkdir /tmp/hudi-decimal-repro
> cd /tmp/hudi-decimal-repro
> tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
> 2. Create a minimal schema file based on [the demo 
> schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
>   The only change is the {{type}} of the field named {{{}low{}}}.
> {code:java}
> echo '{
>   "type":"record",
>   "name":"stock_ticks",
>   "fields":[{
>      "name": "volume",
>      "type": "long"
>   }, {
>      "name": "ts",
>      "type": "string"
>   }, {
>      "name": "symbol",
>      "type": "string"
>   },{
>      "name": "year",
>      "type": "int"
>   },{
>      "name": "month",
>      "type": "string"
>   },{
>      "name": "high",
>      "type": "double"
>   },{
>      "name": "low",
>      "type": {
>        "type": "bytes",
>        "logicalType": "decimal",
>        "precision": 4,
>        "scale": 2
>      }
>   },{
>      "name": "key",
>      "type": "string"
>   },{
>      "name": "date",
>      "type":"string"
>   }, {
>      "name": "close",
>      "type": "double"
>   }, {
>      "name": "open",
>      "type": "double"
>   }, {
>      "name": "day",
>      "type":"string"
>   }
> ]}' > schema.avsc{code}
> 3. Create a minimal properties file
> {code:java}
> echo "hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=date
> hoodie.table.recordkey.fields=key
> hoodie.table.partition.fields=date
> hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > 
> decimal-repro.properties{code}
> 4. Copy data file from the docker demo
> {code:java}
> mkdir data
> cd data
> wget 
> https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
>  
> cd ..{code}
> 5. Run HoodieStreamer
> {code:java}
> spark-3.4.2-bin-hadoop3/bin/spark-submit \
>    --packages 
> org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
>  \
>    --conf spark.kryoserializer.buffer.max=200m \
>    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
>    spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
>    --table-type COPY_ON_WRITE \
>    --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
>    --target-base-path /tmp/hudi-decimal-repro/table \
>    --target-table table \
>    --props /tmp/hudi-decimal-repro/decimal-repro.properties \
>    --schemaprovider-class 
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
> h3. Expected Results
> Command runs successfully, data is ingested successfully into 
> {{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under 
> {{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
> h3. Actual Results
> Command fails with exception, no data is ingsted into the table.  Table left 
> with a hanging commit at the requested state.
> Logs of the attempted run are attached as spark.log
> h2. Additional Information
> This issue does not appear to exist in versions 0.12.2 through 0.14.0 based 
> on my own testing.  It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, and 
> 1.0.0-beta2 releases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to