gtwuser commented on issue #5612:
URL: https://github.com/apache/hudi/issues/5612#issuecomment-1129634916

   > Hi All,
   > 
   > Getting this error `An error occurred while calling 
o155.pyWriteDynamicFrame. Expected instance of group converter but got 
"org.apache.parquet.avro.AvroConverters$FieldUTF8Converter"` when trying to 
upsert single record.
   > 
   > Please let us know what are we doing wrong and what could be a possible 
fix.
   > 
   > using external jars instead of [AWS Hudi 
Connector](https://aws.amazon.com/marketplace/pp/prodview-6rofemcq6erku)
   > 
   > Input source: S3 Input data format : JSONL(Json lines)
   > 
   > Setup/Env config:
   > 
   > * AWS Glue 2.0,
   > * Python 3,
   > * Spark 2
   > * external dependent jars for connecting AWS glue and Hudi:
   >   1. httpclient-4.5.9.jar
   >   2. hudi-spark-bundle_2.11-0.8.0.jar
   >   3. spark-avro_2.11-2.4.4.jar
   > 
   > Hudi Config:
   > 
   > ```shell
   >         commonConfig = {
   >                'className': 'org.apache.hudi', 
'hoodie.datasource.hive_sync.use_jdbc': 'false',
   >                'hoodie.datasource.write.precombine.field': 'MTime',
   >                'hoodie.datasource.write.recordkey.field': 'id',
   >                 'hoodie.table.name': 'ny_yellow_trip_data', 
'hoodie.consistency.check.enabled': 'true',
   >                 'hoodie.datasource.hive_sync.database': 
args['database_name'],
   >                 'hoodie.datasource.hive_sync.table': 'ny_yellow_trip_data' 
+ prefix.replace("/", "_").lower(),
   >                 'hoodie.datasource.hive_sync.enable': 'true', 'path': 
's3://' + args['curated_bucket'] + prefix,
   >                 'hoodie.parquet.small.file.limit': '134217728'  # 1,024 * 
1,024 * 128 = 134,217,728 (128 MB)
   >               }
   >         print("commonConfig:", commonConfig)
   >         #     unpartitionDataConfig = 
{'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.NonPartitionedExtractor', 
'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
   >         partitionDataConfig = {
   >             'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor',
   >             'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.CustomKeyGenerator',
   >             'hoodie.datasource.write.partitionpath.field': 
'ClassId:SIMPLE',
   >             'hoodie.datasource.hive_sync.partition_fields': 'ClassId'
   >         }
   >         # initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 68, 
'hoodie.datasource.write.operation': 'bulk_insert'}
   >         incrementalConfig = {
   >             'hoodie.upsert.shuffle.parallelism': 68, 
'hoodie.datasource.write.operation': 'upsert',
   >             'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
'hoodie.cleaner.commits.retained': 10
   >         }
   >         deleteConfig = {
   >             'hoodie.upsert.shuffle.parallelism': 68, 
'hoodie.delete.shuffle.parallelism': 68, 'hoodie.datasource.write.operation': 
'delete',
   >             'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
'hoodie.cleaner.commits.retained': 10,
   >             
'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.EmptyHoodieRecordPayload'
   >         }
   > ```
   > 
   > Code:
   > 
   > ```shell
   >          combinedConf = {**commonConfig, **partitionDataConfig, 
**incrementalConfig}
   >          inputDf = df_map[prefix] # actual dataframe is created via 
spark.read.json(s3uris[x]) and then kept under this map
   >          print("total records", inputDf.count())
   >          inputDf.printSchema()
   >          
glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(inputDf, 
glueContext, "inputDf"),
   >                                                      
connection_type="marketplace.spark",
   >                                                      
connection_options=combinedConf) #L170 this line 170 is failing
   > ```
   > 
   > **Issue**: On initial bulk insert the schema had been updated, but the 
next incremental upsert payload was quite small and had empty strings as values 
or empty arrays. Have shared a small snippet showcasing it below.
   > 
   > Just for comparison say the initial bulk_insert brought in 10,000 records, 
the next delta upsert had only 1 or 2 records, that too with empty strings as 
values or empty arrays.
   > 
   > Example for one of the property `LogicalLinks`:
   > 
   > During bulk insert and upsert the data were like below: BULK INSERT:
   > 
   > ```shell
   > "LogicalLinks": [
   >     {
   >         "ClassId": "myclassId",
   >         "DbId": 1140,
   >         "IsPresent": true,
   >         "LinkAddress1": "",
   >         "LinkAddress2": "",
   >         "State": "Established",
   >         "Type": "KGF",
   >         "ObjectType": "myObjectType",
   >         "Uptime": "18:14:41"
   >     },
   >     {
   >         "ClassId": "myclassId",
   >         "DbId": 1040,
   >         "IsPresent": false,
   >         "LinkAddress4": "",
   >         "LinkAddress5": "",
   >         "State": "Established",
   >         "Type": "KGF2",
   >         "ObjectType": "myObjectType",
   >         "Uptime": "19:21:41"
   >     }
   > ]
   > ```
   > 
   > UPSERT:
   > 
   > ```shell
   > "LogicalLinks": []
   > ```
   > 
   > Snippet from `printSchema()` for both `bulk insert` and `upsert`:
   > 
   > BULK INSERT schema:
   > 
   > ```shell
   > |-- LogicalDate: array (nullable = true)
   >  |    |-- element: struct (containsNull = true)
   >  |    |    |-- ClassId: string (nullable = true)
   >  |    |    |-- DobId: long (nullable = true)
   >  |    |    |-- IsPresent: boolean (nullable = true)
   > ```
   > 
   > NEXT UPSERT schema:
   > 
   > ```
   > |-- LogicalLinks: array (nullable = true)
   >  |    |-- element: string (containsNull = true)
   > ```
   > 
   > Bulk insert works fine.
   > 
   > Exception trace during upsert:
   > 
   > ```shell
   > 2022-05-17 19:33:31,901 ERROR [main] glue.ProcessLauncher 
(Logging.scala:logError(70)): Error from Python:Traceback (most recent call 
last):
   >   File "/tmp/first-upsert-delete.py", line 170, in <module>
   >     connection_options=combinedConf)
   >   File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", 
line 656, in from_options
   >     format_options, transformation_ctx)
   >   File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 
324, in write_dynamic_frame_from_options
   >     format, format_options, transformation_ctx)
   >   File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 
347, in write_from_options
   >     return sink.write(frame_or_dfc)
   >   File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", 
line 35, in write
   >     return self.writeFrame(dynamic_frame_or_dfc, info)
   >   File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", 
line 31, in writeFrame
   >     return 
DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), 
info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
   >   File 
"/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 
1257, in __call__
   >     answer, self.gateway_client, self.target_id, self.name)
   >   File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 63, in deco
   >     return f(*a, **kw)
   >   File 
"/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, 
in get_return_value
   >     format(target_id, ".", name), value)
   > py4j.protocol.Py4JJavaError: An error occurred while calling 
o155.pyWriteDynamicFrame.
   > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
0 in stage 35.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
35.0 (TID 3464, 172.36.237.190, executor 14): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :0
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
   >    at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
   >    at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
   >    at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
   >    at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
   >    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   >    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   >    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   >    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   >    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   >    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
   >    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
   >    at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
   >    at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
   >    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
   >    at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
   >    at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
   >    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
   >    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
   >    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   >    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   >    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   >    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   >    at org.apache.spark.scheduler.Task.run(Task.scala:121)
   >    at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   >    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   >    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   >    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   >    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   >    at java.lang.Thread.run(Thread.java:748)
   > Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
   >    at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
   >    ... 30 more
   > Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
   >    at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
   >    ... 33 more
   > Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
   >    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
   >    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
   >    ... 34 more
   > Caused by: org.apache.hudi.exception.HoodieException: operation has failed
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
   >    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   >    ... 3 more
   > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 0 in block -1 in file 
s3://hudi-curated-bucket-65dd6070/niatelemetry/NiaInventoryFabrics/niatelemetry.NiaInventoryFabric/b5218000-a16a-45ed-b90c-f33c3964b670-0_0-104-1179_20220517191800.parquet
   >    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
   >    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
   >    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
   >    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
   >    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
   >    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   >    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   >    ... 4 more
   > Caused by: java.lang.ClassCastException: Expected instance of group 
converter but got "org.apache.parquet.avro.AvroConverters$FieldUTF8Converter"
   >    at 
org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
   >    at 
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
   >    at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
   >    at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
   >    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
   >    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
   >    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
   >    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
   >    ... 11 more
   > 
   > Driver stacktrace:
   >    at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
   >    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
   >    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
   >    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   >    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   >    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
   >    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
   >    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
   >    at scala.Option.foreach(Option.scala:257)
   >    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
   >    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
   >    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
   >    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
   >    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   >    at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
   >    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
   >    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
   >    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
   >    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
   >    at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
   >    at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:470)
   >    at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222)
   >    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
   >    at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   >    at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   >    at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   >    at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
   >    at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
   >    at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
   >    at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
   >    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   >    at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
   >    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
   >    at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
   >    at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
   >    at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
   >    at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
   >    at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
   >    at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
   >    at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
   >    at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
   >    at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
   >    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
   >    at 
com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:43)
   >    at 
com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:65)
   >    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   >    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   >    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   >    at java.lang.reflect.Method.invoke(Method.java:498)
   >    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   >    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   >    at py4j.Gateway.invoke(Gateway.java:282)
   >    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   >    at py4j.commands.CallCommand.execute(CallCommand.java:79)
   >    at py4j.GatewayConnection.run(GatewayConnection.java:238)
   >    at java.lang.Thread.run(Thread.java:748)
   > Caused by: org.apache.hudi.exception.HoodieUpsertException: Error 
upserting bucketType UPDATE for partition :0
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
   >    at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
   >    at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
   >    at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
   >    at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
   >    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   >    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   >    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   >    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   >    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   >    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
   >    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
   >    at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
   >    at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
   >    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
   >    at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
   >    at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
   >    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
   >    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
   >    at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   >    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   >    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   >    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   >    at org.apache.spark.scheduler.Task.run(Task.scala:121)
   >    at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   >    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   >    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   >    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   >    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   >    ... 1 more
   > Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
   >    at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
   >    at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
   >    ... 30 more
   > Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
   >    at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
   >    ... 33 more
   > Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
   >    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
   >    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
   >    ... 34 more
   > Caused by: org.apache.hudi.exception.HoodieException: operation has failed
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
   >    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   >    ... 3 more
   > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 0 in block -1 in file 
s3://hudi-curated-bucket-65dd6070/niatelemetry/NiaInventoryFabrics/niatelemetry.NiaInventoryFabric/b5218000-a16a-45ed-b90c-f33c3964b670-0_0-104-1179_20220517191800.parquet
   >    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
   >    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
   >    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
   >    at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
   >    at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
   >    at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
   >    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   >    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   >    ... 4 more
   > Caused by: java.lang.ClassCastException: Expected instance of group 
converter but got "org.apache.parquet.avro.AvroConverters$FieldUTF8Converter"
   >    at 
org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
   >    at 
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
   >    at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
   >    at 
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
   >    at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
   >    at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
   >    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
   >    at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
   >    ... 11 more
   > ```
   Have added a sample of payload in sent in bulk insert and also during the 
next upsert. Basic scenario is the increments may have only empty arrays for 
properties which had data during bulk insert. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to