Shubham21k opened a new issue, #10029:
URL: https://github.com/apache/hudi/issues/10029
We are migrating our derived hudi datasets from hudi version 0.11.1 to
0.13.1.
Just to provide the context on the flow, We have created temp materialised
tables in parquet format using spark job that writes data to a temp path. After
this step is completed, another spark job reads materialised table and writes
incrementally in hoodie format to the main table location.
Upon migrating to hudi v0.13.1, we are encountering exceptions while writing
data in hoodie format **when there is a MAP column present in the schema**.
If there are no map columns in schema, then there are no issues at all.
after downgrading table back to v0.11.1, we are able to write data to hoodie
table without any issues.
main hoodie table properties :
`hoodie.table.keygenerator.class :
org.apache.hudi.keygen.NonpartitionedKeyGenerator`
`hoodie.table.base.file.format : PARQUET`
**To Reproduce**
Steps to reproduce the behavior:
1. create a temp table which writes data in parquet format
Make sure to include a map column in the table.
2. write incrementally to hoodie table s3 path after reading from the temp
table created in previous step.
- scala> val df = spark.sql("select * from clickstream_map_tbl_pos_tmp")
- scala>
df.write.format("org.apache.hudi").mode(SaveMode.Append).save("s3://corp-data/derived-data/clickstream_map_tbl/")
**Expected behavior**
Data should be written to the hoodie table without any issues.
**Environment Description**
* Hudi version : 0.13.1
* Spark version : 3.2.1
* Hive version : 3.1.3
* Hadoop version : NA
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : NO
**Additional context**
For step 1 we are using Presto-On-Spark jars for creating temp materialised
tables.
```
spark-submit --conf spark.app.name=POS(1/2)-clickstream_map_tbl --conf
spark.executor.cores=4 --conf spark.task.cpus=4 --conf spark.driver.cores=2
--conf spark.executor.memory=27g --conf spark.driver.memory=4g --conf
spark.dynamicAllocation.enabled=true --conf
spark.dynamicAllocation.initialExecutors=3 --conf
spark.dynamicAllocation.minExecutors=1 --conf
spark.dynamicAllocation.maxExecutors=8 --conf spark.task.maxFailures=20 --conf
spark.rdd.compress=true --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.shuffle.service.enabled=true --conf
spark.sql.hive.convertMetastoreParquet=false --conf
spark.sql.shuffle.partitions=360 --conf
spark.core.connection.ack.wait.timeout=180s --conf
spark.files.fetchTimeout=180s --conf spark.cleaner.periodicGC.interval=10min
--conf spark.yarn.max.executor.failures=5 --conf
spark.driver.memoryOverhead=400m --conf spark.executor.memoryOverhead=2024m
--conf spark.executor.instances=3 **--class com.facebook.presto.spark.laun
cher.PrestoSparkLauncher /home/hadoop/presto-spark-launcher-0.270.2.jar**
--package /home/hadoop/presto-spark-package-0.270.2.tar.gz --catalog
awsdatacatalog --schema default --config /home/hadoop/config.properties
--catalogs /home/hadoop/catalog_medium --file /tmp/query_1699512610.7256024.sql
```
This --file has a simple insert query which reads from a view.
Step 2 hoodie table schema :
```
Schema:
message hoodie.clickstream_tbl_record {
optional binary _hoodie_commit_time (STRING);
optional binary _hoodie_commit_seqno (STRING);
optional binary _hoodie_record_key (STRING);
optional binary _hoodie_partition_path (STRING);
optional binary _hoodie_file_name (STRING);
optional binary payment_reference_id (STRING);
optional binary session_id (STRING);
optional group event_attributes (MAP) {
repeated group map (MAP_KEY_VALUE) {
required binary key (STRING);
optional binary value (STRING);
}
}
}
```
**Stacktrace**
```
23/11/09 09:43:34 ERROR TaskSetManager: Task 0 in stage 57.0 failed 4 times;
aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 57.0 failed 4 times, most recent failure: Lost task 0.3 in stage 57.0
(TID 7273) (ip-172-31-65-174.ap-south-1.compute.internal executor 36):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:336)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:251)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:156)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:372)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:363)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
... 28 more
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException: unable to read next record from
parquet file
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:73)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
... 31 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next
record from parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at
org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35)
at
org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:65)
... 32 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 0 in block -1 in file
s3://corp-data/derived-data/clickstream_map_tbl/c4f77548-59de-45b5-ae8d-fc4f2c3adf08-0_0-22-2300_20231109055404147.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
... 34 more
Caused by: java.lang.ArrayIndexOutOfBoundsException
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]