kapjoshi-cisco commented on issue #5701:
URL: https://github.com/apache/hudi/issues/5701#issuecomment-1139958926
@nsivabalan @n3nash @umehrot2 @ Kindly suggest what should be done in this
use case, we are stuck with this issue for 1 month now.
Existing column schema in the hudi table created via `bulk-insert`.
Here the value for this array column was like:
```
.....
{ "id":1, "NWDepStatus": [] }
{ "id":2, "NWDepStatus": null }
....
```
This resulted in below schema for this column in hudi table during `bulk
insert`
```bash
root
|-- NWDepStatus: array (nullable = true)
| |-- element: string (containsNull = true)
```
New incoming record schema for the same column is as below. This record is
meant to be saved via `upsert`
with value as
```bash
{
"id": 1,
"NWDepCount": 0,
"NWDepStatus": [
{
"ClassId": "metric.DepStatus",
"Id": 21,
"Name": "MyNW_3",
"ObjectType": "metric.DepStatus",
"Status": "NA"
},
{
"ClassId": "metric.DepStatus",
"Id": 22,
"Name": "MyNW2",
"ObjectType": "metric.DepStatus",
"Status": "NA"
}
]
}
```
Resulting in schema as below which is different from the existing schema
saved in hudi
```bash
root
|-- NWDepStatus: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ClassId: string (nullable = true)
| | |-- Id: long (nullable = true)
| | |-- Name: string (nullable = true)
| | |-- ObjectType: string (nullable = true)
| | |-- Status: string (nullable = true)
```
I even tried altering the existing column schema before writing the new
records by making the schema similar to new records with non empty array and
retaining nulls in it but with no success.
```bash
+------------------------+
|NWDepStatus|
+------------------------+
|null |
|null |
+------------------------+
```
Configs are as follows:
```bash
commonConfig = {
'className': 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.precombine.field': 'MdTimestamp',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.table.name': 'hudi-table',
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': args['database_name'],
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.datasource.hive_sync.table': 'hudi + prefix.replace("/",
"_").lower(),
'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' +
args['curated_bucket'] + '/hudi' + prefix,
'hoodie.parquet.small.file.limit': '134217728' # 1,024 * 1,024 *
128 = 134,217,728 (128 MB)
}
unpartitionDataConfig = {
'hoodie.datasource.hive_sync.partition_extractor_class':
'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
}
initLoadConfig = {
'hoodie.bulkinsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'bulk_insert'
}
incrementalConfig = {
'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 10
}
```
Checked this issue #2265 and the fix #2927. But even with the configs given
as solution its not working and failing with the same error
>inputDf.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.option("spark.hadoop.parquet.avro.write-old-list-structure",
"false") \
.option("parquet.avro.write-old-list-structure", "false") \
.option("hoodie.parquet.avro.write-old-list-structure", "false") \
.option("hoodie.datasource.write.reconcile.schema", "true") \
.options(**combinedConf) \
.mode('append') \
.save()
>>2022-05-27 18:22:12,568 WARN [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 0.0 in stage
363.0 (TID 8061) (172.36.166.181 executor 24):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:342)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:315)
... 28 more
Caused by: org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
... 31 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next
record from parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
... 4 more
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is
not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:207)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
at
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
at
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
at
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
at
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
at
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
at
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
... 8 more
2022-05-27 18:22:12,571 INFO [dispatcher-CoarseGrainedScheduler]
scheduler.TaskSetManager (Logging.scala:logInfo(57)): Starting task 0.1 in
stage 363.0 (TID 8062) (172.36.140.28, executor 15, partition 0, PROCESS_LOCAL,
4444 bytes) taskResourceAssignments Map()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]