soumilshah1995 commented on issue #8977:
URL: https://github.com/apache/hudi/issues/8977#issuecomment-1602559011
Aditya gave me JAR file on Slack and i am trying the sample code out
i am still confused i was expecting throw an error when message is null
here is sample code for you to replicate
```
try:
from pyspark.sql import SparkSession
import os
import sys
import uuid
from datetime import datetime
from faker import Faker
except Exception as e:
print("Error: ", e)
hudi_version = '0.13.1'
jar_file = 'hudi-spark3.3-bundle_2.12-0.14.0-SNAPSHOT.jar'
os.environ['PYSPARK_SUBMIT_ARGS'] = f"--jars {jar_file} pyspark-shell"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer',
'org.apache.spark.serializer.KryoSerializer') \
.config('spark.jars', jar_file) \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
# Create sample data
db_name = "hudidb"
table_name = "test"
recordkey = 'uuid'
precombine = 'precomb'
method = 'upsert'
table_type = "COPY_ON_WRITE"
validator_query = """SELECT COUNT(*) FROM <TABLE_NAME> WHERE message IS NOT
NULL;"""
path = f"file:///C:/tmp/{db_name}/{table_name}"
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine,
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
"hoodie.precommit.validators":
"org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator",
"hoodie.precommit.validators.equality.sql.queries": validator_query
}
try:
print("Trying Append 1")
spark_df = spark.createDataFrame(
data=[
(1, "This is APPEND 1", 111, "1"),
(2, "This is APPEND 2", 222, "2"),
(3, "This is APPEND 5", 222, "3"),
],
schema=["uuid", "message", "precomb", "partition"])
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
print("Append 1 Success....")
except Exception as e:
print("Failed to UPSERT", e)
try:
print("Trying Append 2")
spark_df = spark.createDataFrame(
data=[
(4, None, 444, None),
(5, "This is APPEND 5", 555, "5"),
],
schema=["uuid", "message", "precomb", "partition"])
spark_df.show()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
print("Append 2 Success....")
except Exception as e:
print("Failed to UPSERT", e)
```
# logs
```
C:\Users\s.shah\IdeaProjects\DBT\venv\Scripts\python.exe
"C:\Users\s.shah\IdeaProjects\DBT\Hudi Ut\sam.py"
23/06/22 08:31:45 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Trying Append 1
23/06/22 08:32:01 WARN ProcfsMetricsGetter: Exception when trying to compute
pagesize, as a result reporting of ProcessTree metrics is stopped
+----+----------------+-------+---------+
|uuid| message|precomb|partition|
+----+----------------+-------+---------+
| 1|This is APPEND 1| 111| 1|
| 2|This is APPEND 2| 222| 2|
| 3|This is APPEND 5| 222| 3|
+----+----------------+-------+---------+
23/06/22 08:32:13 WARN DFSPropertiesConfiguration: Cannot find
HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/06/22 08:32:13 WARN DFSPropertiesConfiguration: Properties file
file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
23/06/22 08:32:14 WARN HoodieBackedTableMetadata: Metadata table was not
found at path file:/C:/tmp/hudidb/test/.hoodie/metadata
23/06/22 08:32:19 WARN MetricsConfig: Cannot locate configuration: tried
hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
23/06/22 08:32:21 WARN HoodieTimelineArchiver: Error parsing instant time:
00000000000000010
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add
this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with
module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException:
Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense
failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense
failed.]
23/06/22 08:32:46 WARN BaseSparkCommitActionExecutor: Cannot get table
schema from before state.
java.lang.IllegalArgumentException: Could not find any data file written for
commit, so could not get schema for table file:/C:/tmp/hudidb/test
at
org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchemaFromDataFile(TableSchemaResolver.java:275)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:116)
at
org.apache.hudi.common.table.TableSchemaResolver.lambda$getTableAvroSchemaInternal$3(TableSchemaResolver.java:206)
at org.apache.hudi.common.util.Option.orElseGet(Option.java:149)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:205)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:137)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:126)
at
org.apache.hudi.client.utils.SparkValidatorUtils.getRecordsFromCommittedFiles(SparkValidatorUtils.java:142)
at
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:83)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
at
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
23/06/22 08:32:46 WARN BaseSparkCommitActionExecutor: Use the schema from
after state (current transaction) to create the empty Spark dataframe:
StructType(StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(uuid,LongType,true),StructField(message,StringType,true),StructField(precomb,LongType,true),StructField(partition,StringType,true))
23/06/22 08:32:49 ERROR SqlQueryEqualityPreCommitValidator: query validation
failed. See stdout for sample query results. Query: SELECT COUNT(*) FROM
<TABLE_NAME> WHERE message IS NOT NULL
Expected result (sample records only):
+--------+
|count(1)|
+--------+
| 0|
+--------+
Actual result (sample records only):
+--------+
|count(1)|
+--------+
| 3|
+--------+
23/06/22 08:32:49 ERROR BaseSparkCommitActionExecutor: validation failed for
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator
org.apache.hudi.exception.HoodieValidationException: Query validation failed
for 'SELECT COUNT(*) FROM <TABLE_NAME> WHERE message IS NOT NULL'. See stdout
for expected vs actual records
at
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator.validateUsingQuery(SqlQueryEqualityPreCommitValidator.java:73)
at
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.lambda$validateRecordsBeforeAndAfter$0(SqlQueryPreCommitValidator.java:69)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1006)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:667)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
at
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.validateRecordsBeforeAndAfter(SqlQueryPreCommitValidator.java:68)
at
org.apache.hudi.client.validator.SparkPreCommitValidator.validate(SparkPreCommitValidator.java:82)
at
org.apache.hudi.client.utils.SparkValidatorUtils.lambda$runValidatorAsync$2(SparkValidatorUtils.java:109)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
23/06/22 08:32:49 ERROR BaseSparkCommitActionExecutor: At least one
pre-commit validation failed
Failed to UPSERT An error occurred while calling o58.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for
commit time 20230622083213832
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:74)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.apache.hudi.exception.HoodieValidationException: At least one
pre-commit validation failed
at
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:97)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
at
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
... 46 more
Trying Append 2
+----+----------------+-------+---------+
|uuid| message|precomb|partition|
+----+----------------+-------+---------+
| 4| null| 444| null|
| 5|This is APPEND 5| 555| 5|
+----+----------------+-------+---------+
23/06/22 08:33:28 WARN BaseSparkCommitActionExecutor: Cannot get table
schema from before state.
java.lang.IllegalArgumentException: Could not find any data file written for
commit, so could not get schema for table file:/C:/tmp/hudidb/test
at
org.apache.hudi.common.table.TableSchemaResolver.getTableParquetSchemaFromDataFile(TableSchemaResolver.java:275)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:116)
at
org.apache.hudi.common.table.TableSchemaResolver.lambda$getTableAvroSchemaInternal$3(TableSchemaResolver.java:206)
at org.apache.hudi.common.util.Option.orElseGet(Option.java:149)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaInternal(TableSchemaResolver.java:205)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:137)
at
org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchema(TableSchemaResolver.java:126)
at
org.apache.hudi.client.utils.SparkValidatorUtils.getRecordsFromCommittedFiles(SparkValidatorUtils.java:142)
at
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:83)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
at
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
23/06/22 08:33:28 WARN BaseSparkCommitActionExecutor: Use the schema from
after state (current transaction) to create the empty Spark dataframe:
StructType(StructField(_hoodie_commit_time,StringType,true),StructField(_hoodie_commit_seqno,StringType,true),StructField(_hoodie_record_key,StringType,true),StructField(_hoodie_partition_path,StringType,true),StructField(_hoodie_file_name,StringType,true),StructField(uuid,LongType,true),StructField(message,StringType,true),StructField(precomb,LongType,true),StructField(partition,StringType,true))
23/06/22 08:33:29 ERROR SqlQueryEqualityPreCommitValidator: query validation
failed. See stdout for sample query results. Query: SELECT COUNT(*) FROM
<TABLE_NAME> WHERE message IS NOT NULL
Expected result (sample records only):
+--------+
|count(1)|
+--------+
| 0|
+--------+
Actual result (sample records only):
+--------+
|count(1)|
+--------+
| 1|
+--------+
23/06/22 08:33:29 ERROR BaseSparkCommitActionExecutor: validation failed for
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator
org.apache.hudi.exception.HoodieValidationException: Query validation failed
for 'SELECT COUNT(*) FROM <TABLE_NAME> WHERE message IS NOT NULL'. See stdout
for expected vs actual records
at
org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator.validateUsingQuery(SqlQueryEqualityPreCommitValidator.java:73)
at
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.lambda$validateRecordsBeforeAndAfter$0(SqlQueryPreCommitValidator.java:69)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1006)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at
java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at
java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:667)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at
java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
at
org.apache.hudi.client.validator.SqlQueryPreCommitValidator.validateRecordsBeforeAndAfter(SqlQueryPreCommitValidator.java:68)
at
org.apache.hudi.client.validator.SparkPreCommitValidator.validate(SparkPreCommitValidator.java:82)
at
org.apache.hudi.client.utils.SparkValidatorUtils.lambda$runValidatorAsync$2(SparkValidatorUtils.java:109)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
23/06/22 08:33:29 ERROR BaseSparkCommitActionExecutor: At least one
pre-commit validation failed
Failed to UPSERT An error occurred while calling o88.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for
commit time 20230622083306135
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:74)
at
org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:109)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)
at
org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:141)
at
org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:215)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:382)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:153)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.apache.hudi.exception.HoodieValidationException: At least one
pre-commit validation failed
at
org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:97)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:434)
at
org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:180)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:86)
at
org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:67)
... 46 more
23/06/22 08:33:29 WARN DiskMap: Error while deleting the disk map
directory=/tmp//hudi-BITCASK-699636ae-d2e5-49f2-89f8-01424ebc6d34
java.io.IOException: Unable to delete directory
\tmp\hudi-BITCASK-699636ae-d2e5-49f2-89f8-01424ebc6d34
at
org.apache.hudi.common.util.FileIOUtils.deleteDirectory(FileIOUtils.java:59)
at
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:100)
at
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:92)
at java.base/java.lang.Thread.run(Thread.java:1589)
23/06/22 08:33:29 WARN DiskMap: Error while deleting the disk map
directory=/tmp//hudi-BITCASK-6612ee96-763f-4aa4-aa49-6e70e9f54153
java.io.IOException: Unable to delete directory
\tmp\hudi-BITCASK-6612ee96-763f-4aa4-aa49-6e70e9f54153
at
org.apache.hudi.common.util.FileIOUtils.deleteDirectory(FileIOUtils.java:59)
at
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:100)
at
org.apache.hudi.common.util.collection.DiskMap.cleanup(DiskMap.java:92)
at java.base/java.lang.Thread.run(Thread.java:1589)
Process finished with exit code 0
```
# jar links
https://apache-hudi.slack.com/files/U04U11G3LK0/F05DMRB5KPT/hudi-spark3.3-bundle_2.12-0.14.0-snapshot.jar
CC @ad1happy2go
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]