Brandon Dahler created HUDI-7388:
------------------------------------
Summary: [Regression] Fields with logical type of decimal can no
longer be ingested
Key: HUDI-7388
URL: https://issues.apache.org/jira/browse/HUDI-7388
Project: Apache Hudi
Issue Type: Bug
Affects Versions: 0.14.1, 1.0.0-beta1
Reporter: Brandon Dahler
Attachments: decimal-repro.properties, schema.avsc, spark.log
h2. Problem
When attempting to ingest record with an Avro target schema which includes a
field that uses the
[decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type in
Hudi 0.14.1, an exception is thrown:
{code:java}
24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot
support rewrite value for schema type:
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
since the old schema type is:
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
at
org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value
for schema type:
{"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
since the old schema type is:
{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
at
org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
at
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
at
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 21 more {code}
h2. Reproduction Steps
# Setup clean spark install
{code:java}
mkdir /tmp/hudi-decimal-repro
cd /tmp/hudi-decimal-repro
tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
# Create a minimal schema file based on [the demo
schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
The only change is the {{type}} of the field named {{{}low{}}}.
{code:java}
echo '{
"type":"record",
"name":"stock_ticks",
"fields":[{
"name": "volume",
"type": "long"
}, {
"name": "ts",
"type": "string"
}, {
"name": "symbol",
"type": "string"
},{
"name": "year",
"type": "int"
},{
"name": "month",
"type": "string"
},{
"name": "high",
"type": "double"
},{
"name": "low",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 2
}
},{
"name": "key",
"type": "string"
},{
"name": "date",
"type":"string"
}, {
"name": "close",
"type": "double"
}, {
"name": "open",
"type": "double"
}, {
"name": "day",
"type":"string"
}
]}' > schema.avsc{code}
# Create a minimal properties file
{code:java}
echo "hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=date
hoodie.table.recordkey.fields=key
hoodie.table.partition.fields=date
hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" >
decimal-repro.properties{code}
# Copy data file from the docker demo
{code:java}
mkdir data
cd data
wget
https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
cd ..{code}
# Run HoodieStreamer
{code:java}
spark-3.4.2-bin-hadoop3/bin/spark-submit \
--packages
org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.14.1,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1
\
--conf spark.kryoserializer.buffer.max=200m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--class org.apache.hudi.utilities.streamer.HoodieStreamer \
spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
--target-base-path /tmp/hudi-decimal-repro/table \
--target-table table \
--props /tmp/hudi-decimal-repro/decimal-repro.properties \
--schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
h3. Expected Results
Command runs successfully, data is ingested successfully into
{{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under
{{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
h3. Actual Results
Command fails with exception, no data is ingsted into the table. Table left
with a hanging commit at the requested state.
Logs of the attempted run are attached as spark.log
h2. Additional Information
This issue does not appear to exist in versions 0.12.2 through 0.14.0 based on
my own testing. It does affect both the 0.14.1 and 1.0.0-beta1 releases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)