[
https://issues.apache.org/jira/browse/HUDI-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-8620:
--------------------------------------
Status: In Progress (was: Open)
> ColumnStats decimal record type not supported
> ---------------------------------------------
>
> Key: HUDI-8620
> URL: https://issues.apache.org/jira/browse/HUDI-8620
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: Lin Liu
> Assignee: sivabalan narayanan
> Priority: Blocker
> Fix For: 1.0.1
>
>
> {code:java}
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
> at
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
> at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
> at
> org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
> at
> org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
> at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
> at
> org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:108)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:193)
> at
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:164)
> at
> org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
> at
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
> at
> org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
> at
> org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:145)
> at
> org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:63)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1413)
> at
> org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:144)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:999)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:1055)
> at
> org.apache.hudi.client.BaseHoodieClient.writeTableMetadata(BaseHoodieClient.java:277)
> ... 114 more
> Caused by: java.lang.UnsupportedOperationException: Unsupported type of the
> value (class org.apache.spark.sql.types.Decimal)
> at
> org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro(HoodieAvroUtils.java:1446)
> at
> org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecord(HoodieMetadataPayload.java:521)
> at
> org.apache.hudi.metadata.HoodieMetadataPayload.lambda$createColumnStatsRecords$5(HoodieMetadataPayload.java:496)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at java.util.HashMap$ValueSpliterator.tryAdvance(HashMap.java:1673)
> at
> java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295)
> at
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207)
> at
> java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162)
> at
> java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301)
> at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at
> org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
> at
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
> at
> org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
> at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
> at org.apache.spark.scheduler.Task.run(Task.scala:141)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
> {code:java}
> The write statement:
> mdfs.write.format("hudi").
> option("hoodie.table.name", "store_sales").
> option("hoodie.database.name", "tpcds_hudi_1gb").
> option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
> option("hoodie.datasource.write.recordkey.field", "ss_ticket_number").
> option("hoodie.datasource.write.partitionpath.field", "ss_sold_date_sk").
> option("hoodie.datasource.write.precombine.field", "ss_sold_date_sk").
> option("hoodie.datasource.read.use.new.parquet.file.format", "true").
> option("hoodie.file.group.reader.enabled", "true").
> option("hoodie.write.record.positions", "true").
> option("hoodie.metadata.enable", "true").
> option("hoodie.enable.data.skipping", "true").
> option("hoodie.metadata.index.column.stats.enable", "true").
> option("hoodie.table.services.enabled", "true").
> option("hoodie.compact.inline", "true").
> option("hoodie.compact.inline.max.delta.commits", "2").
> option("hoodie.parquet.small.file.limit", "0").
> option("hoodie.clustering.inline", "false").
> option("hoodie.payload.ordering.field", "ss_sold_date_sk").
> option("hoodie.write.record.merge.custom.implementation.classes",
> "org.apache.hudi.DefaultSparkRecordMerger").
> option("hoodie.logfile.data.block.format", "parquet").
> option("hoodie.spark.sql.merge.into.partial.updates", "true").
> mode("Append").
> save(basePath)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)