[ 
https://issues.apache.org/jira/browse/HUDI-3058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-3058:
---------------------------------
    Labels: pull-request-available sev:high  (was: sev:high)

> SqlQueryEqualityPreCommitValidator errors with 
> java.util.ConcurrentModificationException
> ----------------------------------------------------------------------------------------
>
>                 Key: HUDI-3058
>                 URL: https://issues.apache.org/jira/browse/HUDI-3058
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Usability
>    Affects Versions: 0.10.0
>            Reporter: sivabalan narayanan
>            Assignee: satish
>            Priority: Major
>              Labels: pull-request-available, sev:high
>             Fix For: 0.11.0
>
>
> Ref issue: [https://github.com/apache/hudi/issues/4109]
>  
> Faced concurrentModificationException when trying to test 
> SqlQueryEqualityPreCommitValidator in quickstart guide
> *To Reproduce*
> Steps to reproduce the behavior:
>  # Insert data without any pre commit validations
>  # Update data (ensured the updates dont touch the fare column in quickstart 
> example) with the following precommit validator props
> {{option("hoodie.precommit.validators", 
> "org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator").
> option("hoodie.precommit.validators.equality.sql.queries", "select sum(fare) 
> from <TABLE_NAME>").}}
> stacktrace:
> {code:java}
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
> time 20211124114945342
> at 
> org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
> at 
> org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)
> at 
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:111)
> at 
> org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:95)
> at 
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:174)
> at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)
> at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:276)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
> at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> ... 70 elided
> Caused by: java.util.ConcurrentModificationException
> at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1633)
> at 
> java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:743)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at 
> org.apache.hudi.client.utils.SparkValidatorUtils.getRecordsFromPendingCommits(SparkValidatorUtils.java:159)
> at 
> org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:78)
> at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:401)
> at 
> org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:140)
> at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:267)
> at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:182)
> at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:82)
> at 
> org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:55)
> ... 98 more {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to