gtwuser commented on issue #5612: URL: https://github.com/apache/hudi/issues/5612#issuecomment-1129634916
> Hi All, > > Getting this error `An error occurred while calling o155.pyWriteDynamicFrame. Expected instance of group converter but got "org.apache.parquet.avro.AvroConverters$FieldUTF8Converter"` when trying to upsert single record. > > Please let us know what are we doing wrong and what could be a possible fix. > > using external jars instead of [AWS Hudi Connector](https://aws.amazon.com/marketplace/pp/prodview-6rofemcq6erku) > > Input source: S3 Input data format : JSONL(Json lines) > > Setup/Env config: > > * AWS Glue 2.0, > * Python 3, > * Spark 2 > * external dependent jars for connecting AWS glue and Hudi: > 1. httpclient-4.5.9.jar > 2. hudi-spark-bundle_2.11-0.8.0.jar > 3. spark-avro_2.11-2.4.4.jar > > Hudi Config: > > ```shell > commonConfig = { > 'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false', > 'hoodie.datasource.write.precombine.field': 'MTime', > 'hoodie.datasource.write.recordkey.field': 'id', > 'hoodie.table.name': 'ny_yellow_trip_data', 'hoodie.consistency.check.enabled': 'true', > 'hoodie.datasource.hive_sync.database': args['database_name'], > 'hoodie.datasource.hive_sync.table': 'ny_yellow_trip_data' + prefix.replace("/", "_").lower(), > 'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + args['curated_bucket'] + prefix, > 'hoodie.parquet.small.file.limit': '134217728' # 1,024 * 1,024 * 128 = 134,217,728 (128 MB) > } > print("commonConfig:", commonConfig) > # unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'} > partitionDataConfig = { > 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', > 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator', > 'hoodie.datasource.write.partitionpath.field': 'ClassId:SIMPLE', > 'hoodie.datasource.hive_sync.partition_fields': 'ClassId' > } > # initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 68, 'hoodie.datasource.write.operation': 'bulk_insert'} > incrementalConfig = { > 'hoodie.upsert.shuffle.parallelism': 68, 'hoodie.datasource.write.operation': 'upsert', > 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10 > } > deleteConfig = { > 'hoodie.upsert.shuffle.parallelism': 68, 'hoodie.delete.shuffle.parallelism': 68, 'hoodie.datasource.write.operation': 'delete', > 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10, > 'hoodie.datasource.write.payload.class':'org.apache.hudi.common.model.EmptyHoodieRecordPayload' > } > ``` > > Code: > > ```shell > combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig} > inputDf = df_map[prefix] # actual dataframe is created via spark.read.json(s3uris[x]) and then kept under this map > print("total records", inputDf.count()) > inputDf.printSchema() > glueContext.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), > connection_type="marketplace.spark", > connection_options=combinedConf) #L170 this line 170 is failing > ``` > > **Issue**: On initial bulk insert the schema had been updated, but the next incremental upsert payload was quite small and had empty strings as values or empty arrays. Have shared a small snippet showcasing it below. > > Just for comparison say the initial bulk_insert brought in 10,000 records, the next delta upsert had only 1 or 2 records, that too with empty strings as values or empty arrays. > > Example for one of the property `LogicalLinks`: > > During bulk insert and upsert the data were like below: BULK INSERT: > > ```shell > "LogicalLinks": [ > { > "ClassId": "myclassId", > "DbId": 1140, > "IsPresent": true, > "LinkAddress1": "", > "LinkAddress2": "", > "State": "Established", > "Type": "KGF", > "ObjectType": "myObjectType", > "Uptime": "18:14:41" > }, > { > "ClassId": "myclassId", > "DbId": 1040, > "IsPresent": false, > "LinkAddress4": "", > "LinkAddress5": "", > "State": "Established", > "Type": "KGF2", > "ObjectType": "myObjectType", > "Uptime": "19:21:41" > } > ] > ``` > > UPSERT: > > ```shell > "LogicalLinks": [] > ``` > > Snippet from `printSchema()` for both `bulk insert` and `upsert`: > > BULK INSERT schema: > > ```shell > |-- LogicalDate: array (nullable = true) > | |-- element: struct (containsNull = true) > | | |-- ClassId: string (nullable = true) > | | |-- DobId: long (nullable = true) > | | |-- IsPresent: boolean (nullable = true) > ``` > > NEXT UPSERT schema: > > ``` > |-- LogicalLinks: array (nullable = true) > | |-- element: string (containsNull = true) > ``` > > Bulk insert works fine. > > Exception trace during upsert: > > ```shell > 2022-05-17 19:33:31,901 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last): > File "/tmp/first-upsert-delete.py", line 170, in <module> > connection_options=combinedConf) > File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 656, in from_options > format_options, transformation_ctx) > File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 324, in write_dynamic_frame_from_options > format, format_options, transformation_ctx) > File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 347, in write_from_options > return sink.write(frame_or_dfc) > File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write > return self.writeFrame(dynamic_frame_or_dfc, info) > File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame > return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors") > File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco > return f(*a, **kw) > File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o155.pyWriteDynamicFrame. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 35.0 failed 4 times, most recent failure: Lost task 0.3 in stage 35.0 (TID 3464, 172.36.237.190, executor 14): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139) > at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) > at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed > at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281) > ... 30 more > Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143) > at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100) > ... 33 more > Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) > ... 34 more > Caused by: org.apache.hudi.exception.HoodieException: operation has failed > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://hudi-curated-bucket-65dd6070/niatelemetry/NiaInventoryFabrics/niatelemetry.NiaInventoryFabric/b5218000-a16a-45ed-b90c-f33c3964b670-0_0-104-1179_20220517191800.parquet > at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) > at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) > at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ... 4 more > Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.parquet.avro.AvroConverters$FieldUTF8Converter" > at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) > at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) > at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) > at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) > at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) > ... 11 more > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) > at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) > at scala.Option.foreach(Option.scala:257) > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) > at org.apache.spark.rdd.RDD.count(RDD.scala:1168) > at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:470) > at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222) > at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) > at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) > at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) > at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) > at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:43) > at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:65) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139) > at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) > at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) > at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182) > at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed > at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308) > at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281) > ... 30 more > Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143) > at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100) > ... 33 more > Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) > ... 34 more > Caused by: org.apache.hudi.exception.HoodieException: operation has failed > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ... 3 more > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3://hudi-curated-bucket-65dd6070/niatelemetry/NiaInventoryFabrics/niatelemetry.NiaInventoryFabric/b5218000-a16a-45ed-b90c-f33c3964b670-0_0-104-1179_20220517191800.parquet > at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) > at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) > at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ... 4 more > Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.parquet.avro.AvroConverters$FieldUTF8Converter" > at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) > at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) > at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) > at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) > at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) > ... 11 more > ``` Have added a sample of payload in sent in bulk insert and also during the next upsert. Basic scenario is the increments may have only empty arrays for properties which had data during bulk insert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
