aznwarmonkey opened a new issue #4541:
URL: https://github.com/apache/hudi/issues/4541
Hello,
I am currently getting an exception while writing a `hudi` talbe in
`bulk_ingest` mode. Please see below for the stacktrace along with the snippet
of code I a using to write the data. I am new to `hudi` and this stacktrace
doesn't provide much insight as to why it is happening. Any help with this
issue is greatly appreciative
```bash
py4j.protocol.Py4JJavaError: An error occurred while calling o195.save.
: org.apache.spark.SparkException: Writing job failed.
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:383)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:336)
at
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:218)
at
org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:370)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at
org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow(HoodieSparkSqlWriter.scala:302)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127)
at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException:
java.lang.NullPointerException
at
org.apache.hudi.internal.DataSourceInternalWriterHelper.commit(DataSourceInternalWriterHelper.java:83)
at
org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.commit(HoodieDataSourceInternalBatchWrite.java:84)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
... 69 more
Suppressed: org.apache.hudi.exception.HoodieRollbackException:
Failed to rollback s3://path/to/data/ commits 20220108133454
at
org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:563)
at
org.apache.hudi.internal.DataSourceInternalWriterHelper.abort(DataSourceInternalWriterHelper.java:91)
at
org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite.abort(HoodieDataSourceInternalBatchWrite.java:89)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:378)
... 69 more
Caused by: org.apache.hudi.exception.HoodieRollbackException: Found
in-flight commits after time :20220108133454, please rollback greater commits
first
at
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.validateRollbackCommitSequence(BaseRollbackActionExecutor.java:148)
at
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.doRollbackAndGetStats(BaseRollbackActionExecutor.java:166)
at
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:103)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:230)
at
org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:552)
... 72 more
Caused by: java.util.concurrent.CompletionException:
java.lang.NullPointerException
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
at
java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.NullPointerException
at java.io.StringReader.<init>(StringReader.java:50)
at org.apache.avro.Schema$Parser.parse(Schema.java:1020)
at
org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor.lambda$runClusteringForGroupAsync$3(SparkExecuteClusteringCommitActionExecutor.java:118)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 5 more
```
This is how I am writing out the the data:
```python
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.recordkey.field': keys,
'hoodie.datasource.write.partitionpath.field':
','.join(partitions),
'hoodie.datasource.write.hive_style_partitioning': True,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.precombine.field': timestamp_col,
'hoodie.index.type': 'BLOOM',
'hoodie.consistency.check.enabled': True,
'hoodie.bloom.index.bucketized.checking': True,
'hoodie.bloom.index.keys.per.bucket': 50000000,
'hoodie.index.bloom.num_entries': 1000000,
'hoodie.bloom.index.use.caching': True,
'hoodie.bloom.index.use.treebased.filter': True,
'hoodie.bloom.index.filter.type': 'DYNAMIC_V0',
'hoodie.bloom.index.filter.dynamic.max.entries': 1000000,
'hoodie.bloom.index.prune.by.ranges': True,
'hoodie.parquet.small.file.limit': 134217728,
'hoodie.parquet.max.file.size': 1073741824,
'write.bulk_insert.shuffle_by_partition': True,
'write.parquet.block.size': 256,
'hoodie.copyonwrite.insert.auto.split': False,
'hoodie.copyonwrite.insert.split.size': 10000000,
'hoodie.datasource.write.row.writer.enable': True,
'hoodie.bulkinsert.sort.mode': 'PARTITION_SORT',
'hoodie.bulkinsert.shuffle.parallelism': 200,
'hoodie.clustering.inline': True,
'hoodie.clustering.inline.max.commits': '1',
'hoodie.clustering.plan.strategy.small.file.limit': '1073741824',
'hoodie.clustering.plan.strategy.target.file.max.bytes':
'2147483648',
'hoodie.clustering.execution.strategy.class':
'org.apache.hudi.client.clustering.run.strategy'
'.SparkSortAndSizeExecutionStrategy',
'hoodie.clustering.plan.strategy.sort.columns': sort_cols,
'hoodie.cleaner.commits.retained': '2',
'hoodie.clean.async': True,
}
df.write.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'bulk_ingest') \
.options(**hudi_options).mode('append').save(output_path)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]