[
https://issues.apache.org/jira/browse/HUDI-7388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17921465#comment-17921465
]
Brandon Dahler commented on HUDI-7388:
--------------------------------------
Was able to re-test against 1.0.0 and noticed that there's a slightly different
exception thrown now:
{code:java}
25/01/24 10:26:17 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 7)
java.lang.RuntimeException:
org.apache.hudi.internal.schema.HoodieSchemaException: Failed to convert JSON
string to Avro record: {"volume": 483951, "symbol": "MSFT", "ts": "2018-08-31
09:30:00", "month": "08", "high": 111.74, "low":
111.55, "key": "MSFT_2018-08-31 09", "year": 2018, "date": "2018/08/31",
"close": 111.72, "open": 111.55, "day": "31"}; schema:
{"type":"record","name":"stock_ticks","fields":[{"name":"volume","type":"long"},{"name":"ts","type":"stri
ng"},{"name":"symbol","type":"string"},{"name":"year","type":"int"},{"name":"month","type":"string"},{"name":"high","type":"double"},{"name":"low","type":{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}},{"name":"key"
,"type":"string"},{"name":"date","type":"string"},{"name":"close","type":"double"},{"name":"open","type":"double"},{"name":"day","type":"string"}]}
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:123)
at
org.apache.hudi.common.util.collection.ClosableIterator$1.next(ClosableIterator.java:41)
at
org.apache.hudi.common.util.collection.MappingIterator.next(MappingIterator.java:44)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to
convert JSON string to Avro record: {"volume": 483951, "symbol": "MSFT", "ts":
"2018-08-31 09:30:00", "month": "08", "high": 111.74, "low": 111.55, "key": "M
SFT_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": 111.72,
"open": 111.55, "day": "31"}; schema:
{"type":"record","name":"stock_ticks","fields":[{"name":"volume","type":"long"},{"name":"ts","type":"string"},{"name":"sym
bol","type":"string"},{"name":"year","type":"int"},{"name":"month","type":"string"},{"name":"high","type":"double"},{"name":"low","type":{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}},{"name":"key","type":"string"}
,{"name":"date","type":"string"},{"name":"close","type":"double"},{"name":"open","type":"double"},{"name":"day","type":"string"}]}
at
org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:122)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:33)
at
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
at
org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
at
org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
... 21 more
Caused by: org.apache.hudi.exception.HoodieJsonToAvroConversionException:
failed to convert json to avro
at
org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:119)
at
org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:118)
... 27 more
Caused by: org.apache.hudi.exception.HoodieJsonConversionException: failed to
convert json to avro
at
org.apache.hudi.avro.processors.JsonFieldProcessor.convertField(JsonFieldProcessor.java:33)
at
org.apache.hudi.avro.MercifulJsonConverter.convertField(MercifulJsonConverter.java:198)
at
org.apache.hudi.avro.MercifulJsonConverter.convertJsonField(MercifulJsonConverter.java:193)
at
org.apache.hudi.avro.MercifulJsonConverter.convertJsonToAvro(MercifulJsonConverter.java:136)
at
org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:117)
... 28 more {code}
> [Regression] Records with a field of logical type decimal can no longer be
> ingested via HoodieStreamer
> ------------------------------------------------------------------------------------------------------
>
> Key: HUDI-7388
> URL: https://issues.apache.org/jira/browse/HUDI-7388
> Project: Apache Hudi
> Issue Type: Bug
> Components: deltastreamer
> Affects Versions: 1.0.0-beta1, 0.14.1, 0.15.0, 1.0.0-beta2, 1.0.0
> Reporter: Brandon Dahler
> Priority: Major
> Fix For: 0.16.0
>
> Attachments: decimal-repro.properties, schema.avsc, spark.log
>
>
> h2. Problem
> When attempting to ingest record with an Avro target schema which includes a
> field that uses the
> [decimal|https://avro.apache.org/docs/1.11.0/spec.html#Decimal] logical type
> in Hudi 0.14.1, an exception is thrown:
> {code:java}
> 24/02/06 21:30:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.RuntimeException: org.apache.avro.AvroRuntimeException: cannot
> support rewrite value for schema type:
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
> since the old schema type is:
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
> at
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
> at
> org.apache.hudi.utilities.streamer.HoodieStreamerUtils.lambda$null$a903797$1(HoodieStreamerUtils.java:92)
> at
> org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> at org.apache.spark.scheduler.Task.run(Task.scala:139)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by: org.apache.avro.AvroRuntimeException: cannot support rewrite value
> for schema type:
> {"type":"fixed","name":"fixed","namespace":"stock_ticks.low","size":2,"logicalType":"decimal","precision":4,"scale":2}
> since the old schema type is:
> {"type":"bytes","logicalType":"decimal","precision":4,"scale":2}
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryTypeWithDiffSchemaType(HoodieAvroUtils.java:1088)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewritePrimaryType(HoodieAvroUtils.java:1006)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:951)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:899)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:877)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:847)
> at
> org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordDeep(HoodieAvroUtils.java:1259)
> at
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
> at
> org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
> at
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
> ... 21 more {code}
> h2. Reproduction Steps
> 1. Setup clean spark install
> {code:java}
> mkdir /tmp/hudi-decimal-repro
> cd /tmp/hudi-decimal-repro
> tar -xvzf ~/spark-3.4.2-bin-hadoop3.tgz{code}
> 2. Create a minimal schema file based on [the demo
> schema|https://github.com/apache/hudi/blob/release-0.14.1/docker/demo/config/schema.avsc].
> The only change is the {{type}} of the field named {{{}low{}}}.
> {code:java}
> echo '{
> "type":"record",
> "name":"stock_ticks",
> "fields":[{
> "name": "volume",
> "type": "long"
> }, {
> "name": "ts",
> "type": "string"
> }, {
> "name": "symbol",
> "type": "string"
> },{
> "name": "year",
> "type": "int"
> },{
> "name": "month",
> "type": "string"
> },{
> "name": "high",
> "type": "double"
> },{
> "name": "low",
> "type": {
> "type": "bytes",
> "logicalType": "decimal",
> "precision": 4,
> "scale": 2
> }
> },{
> "name": "key",
> "type": "string"
> },{
> "name": "date",
> "type":"string"
> }, {
> "name": "close",
> "type": "double"
> }, {
> "name": "open",
> "type": "double"
> }, {
> "name": "day",
> "type":"string"
> }
> ]}' > schema.avsc{code}
> 3. Create a minimal properties file
> {code:java}
> echo "hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=date
> hoodie.table.recordkey.fields=key
> hoodie.table.partition.fields=date
> hoodie.streamer.schemaprovider.source.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.schemaprovider.target.schema.file=/tmp/hudi-decimal-repro/schema.avsc
> hoodie.streamer.source.dfs.root=/tmp/hudi-decimal-repro/data" >
> decimal-repro.properties{code}
> 4. Copy data file from the docker demo
> {code:java}
> mkdir data
> cd data
> wget
> https://raw.githubusercontent.com/apache/hudi/release-0.14.1/docker/demo/data/batch_1.json
>
> cd ..{code}
> 5. Run HoodieStreamer
> {code:java}
> spark-3.4.2-bin-hadoop3/bin/spark-submit \
> --packages
> org.apache.hudi:hudi-utilities-slim-bundle_2.12:0.15.0,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0
> \
> --conf spark.kryoserializer.buffer.max=200m \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --class org.apache.hudi.utilities.streamer.HoodieStreamer \
> spark-3.4.2-bin-hadoop3/examples/jars/spark-examples_2.12-3.4.2.jar \
> --table-type COPY_ON_WRITE \
> --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
> --target-base-path /tmp/hudi-decimal-repro/table \
> --target-table table \
> --props /tmp/hudi-decimal-repro/decimal-repro.properties \
> --schemaprovider-class
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider {code}
> h3. Expected Results
> Command runs successfully, data is ingested successfully into
> {{{}/tmp/hudi-decimal-repro/table{}}}, some files exist under
> {{{}/tmp/hudi-decimal-repro/table{}}}/{{{}2018/08/31/{}}}.
> h3. Actual Results
> Command fails with exception, no data is ingsted into the table. Table left
> with a hanging commit at the requested state.
> Logs of the attempted run are attached as spark.log
> h2. Additional Information
> This issue does not appear to exist in versions 0.12.2 through 0.14.0 based
> on my own testing. It does affect all of 0.14.1, 0.15.0, 1.0.0-beta1, and
> 1.0.0-beta2 releases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)