Brandon Dahler created HUDI-7388:
------------------------------------

             Summary: [Regression] Fields with logical type of decimal can no 
longer be ingested
                 Key: HUDI-7388
                 URL: https://issues.apache.org/jira/browse/HUDI-7388
             Project: Apache Hudi
          Issue Type: Bug
    Affects Versions: 0.14.1, 1.0.0-beta1
            Reporter: Brandon Dahler
         Attachments: decimal-repro.properties, schema.avsc, spark.log

h2. Problem

When attempting to ingest record with an Avro target schema which includes a 
field that uses the 
[decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type in 
Hudi 0.14.1, an exception is thrown:

 
{code:java}
24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot 
support rewrite value for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
        at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
        at 
org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value 
for schema type: 
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
 since the old schema type is: 
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
        at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
        at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
        at 
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
        at 
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
        ... 21 more {code}
 

 
h2. Reproduction Steps
 # Setup clean spark install

{code:java}
mkdir /tmp/hudi-decimal-repro
cd /tmp/hudi-decimal-repro
tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}

 # Create a minimal schema file based on [the demo 
schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
  The only change is the {{type}} of the field named {{{}low{}}}.

{code:java}
echo '{
  "type":"record",
  "name":"stock_ticks",
  "fields":[{
     "name": "volume",
     "type": "long"
  }, {
     "name": "ts",
     "type": "string"
  }, {
     "name": "symbol",
     "type": "string"
  },{
     "name": "year",
     "type": "int"
  },{
     "name": "month",
     "type": "string"
  },{
     "name": "high",
     "type": "double"
  },{
     "name": "low",
     "type": {
       "type": "bytes",
       "logicalType": "decimal",
       "precision": 4,
       "scale": 2
     }
  },{
     "name": "key",
     "type": "string"
  },{
     "name": "date",
     "type":"string"
  }, {
     "name": "close",
     "type": "double"
  }, {
     "name": "open",
     "type": "double"
  }, {
     "name": "day",
     "type":"string"
  }
]}' > schema.avsc{code}

 # Create a minimal properties file

{code:java}
echo "hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=date
hoodie.table.recordkey.fields=key
hoodie.table.partition.fields=date
hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" > 
decimal-repro.properties{code}

 # Copy data file from the docker demo

{code:java}
mkdir data
cd data
wget 
https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
 
cd ..{code}

 # Run HoodieStreamer

{code:java}
spark-3.4.2-bin-hadoop3/bin/spark-submit \
   --packages 
org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.14.1,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1
 \
   --conf spark.kryoserializer.buffer.max=200m \
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
   --class org.apache.hudi.utilities.streamer.HoodieStreamer \
   spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
   --table-type COPY_ON_WRITE \
   --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
   --target-base-path /tmp/hudi-decimal-repro/table \
   --target-table table \
   --props /tmp/hudi-decimal-repro/decimal-repro.properties \
   --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}

h3. Expected Results

Command runs successfully, data is ingested successfully into 
{{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under 
{{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
h3. Actual Results

Command fails with exception, no data is ingsted into the table.  Table left 
with a hanging commit at the requested state.

Logs of the attempted run are attached as spark.log
h2. Additional Information

This issue does not appear to exist in versions 0.12.2 through 0.14.0 based on 
my own testing.  It does affect both the 0.14.1 and 1.0.0-beta1 releases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to